You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/29 08:00:33 UTC

apex-malhar git commit: APEXMALHAR-2453 Added Sort Accumulation for Windowed operators

Repository: apex-malhar
Updated Branches:
  refs/heads/master f2bc30fce -> 611701c3d


APEXMALHAR-2453 Added Sort Accumulation for Windowed operators


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/611701c3
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/611701c3
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/611701c3

Branch: refs/heads/master
Commit: 611701c3df6bf95148fc9e3a6445efeee26581f4
Parents: f2bc30f
Author: ajaygit158 <aj...@gmail.com>
Authored: Tue Mar 21 17:20:19 2017 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Wed Mar 29 12:09:04 2017 +0530

----------------------------------------------------------------------
 .../malhar/lib/window/accumulation/Sort.java    | 106 ++++++++++++
 .../lib/window/accumulation/SortTest.java       | 160 +++++++++++++++++++
 2 files changed, 266 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
new file mode 100644
index 0000000..7dc684f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Sort accumulation
+ */
+@InterfaceStability.Evolving
+public class Sort<T> implements Accumulation<T, List<T>, List<T>>
+{
+  boolean reverseSort;
+  Comparator<T> comparator;
+  Comparator<T> reverseComparator;
+
+  public Sort()
+  {
+    //for kryo
+  }
+
+  /**
+   * @param reverseSort
+   *          sort in order opposite to how the comparator would sort
+   * @param comparator
+   *          comparator to sort the tuples
+   */
+  public Sort(final boolean reverseSort, final Comparator<T> comparator)
+  {
+    this.reverseSort = reverseSort;
+    this.comparator = comparator;
+    this.reverseComparator = Collections.reverseOrder(comparator);
+  }
+
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new ArrayList<T>();
+  }
+
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    if (comparator == null) {
+      throw new RuntimeException("Comparator not provided, Tuple cannot be compared");
+    }
+    Comparator<T> accComparator = reverseSort ? reverseComparator : comparator;
+    insertElement(accumulatedValue, input, accComparator);
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    if (comparator == null) {
+      throw new RuntimeException("Comparator not provided, Tuple cannot be compared");
+    }
+    Comparator<T> accComparator = reverseSort ? reverseComparator : comparator;
+    for (T t : accumulatedValue2) {
+      insertElement(accumulatedValue1, t, accComparator);
+    }
+    return accumulatedValue1;
+  }
+
+  private void insertElement(List<T> accumulatedValue, T element, Comparator<T> comparator)
+  {
+    //binarySearch returns location if input exists else returns (-(insertion point) - 1)
+    int index = Collections.binarySearch(accumulatedValue, element, comparator);
+    index = index >= 0 ? index : (-index - 1);
+    accumulatedValue.add(index, element);
+  }
+
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> getRetraction(List<T> accumulatedValue)
+  {
+    return new ArrayList<T>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
new file mode 100644
index 0000000..daeb4fc
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Sort accumulation
+ */
+public class SortTest
+{
+  public static class TestPojo1
+  {
+    private int uId;
+    private String uName;
+
+    public TestPojo1()
+    {
+
+    }
+
+    public TestPojo1(int id, String name)
+    {
+      this.uId = id;
+      this.uName = name;
+    }
+
+    public int getUId()
+    {
+      return uId;
+    }
+
+    public void setUId(int uId)
+    {
+      this.uId = uId;
+    }
+
+    public String getUName()
+    {
+      return uName;
+    }
+
+    public void setUName(String uName)
+    {
+      this.uName = uName;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      TestPojo1 other = (TestPojo1)obj;
+      if (uId != other.uId) {
+        return false;
+      }
+      if (uName == null) {
+        if (other.uName != null) {
+          return false;
+        }
+      } else if (!uName.equals(other.uName)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  @Test
+  public void SortTestAscDesc()
+  {
+    Comparator<TestPojo1> comparator = new Comparator<TestPojo1>()
+    {
+
+      @Override
+      public int compare(TestPojo1 o1, TestPojo1 o2)
+      {
+        if (o1 == null && o2 == null) {
+          return 0;
+        } else if (o1 == null) {
+          return -1;
+        } else if (o2 == null) {
+          return 1;
+        } else if (o1.getUId() != o2.getUId()) {
+          return o1.getUId() - o2.getUId();
+        } else {
+          return o1.getUName().compareTo(o2.getUName());
+        }
+      }
+    };
+    Sort<TestPojo1> sort = new Sort<>(false, comparator);
+    TestPojo1 o1 = new TestPojo1(5, "user1");
+    TestPojo1 o2 = new TestPojo1(15, "user32");
+    TestPojo1 o3 = new TestPojo1(5, "user11");
+    TestPojo1 o4 = new TestPojo1(2, "user12");
+    TestPojo1 o5 = new TestPojo1(15, "user32");
+    List<TestPojo1> ascList = new ArrayList<>();
+    ascList.add(o4);
+    ascList.add(o1);
+    ascList.add(o3);
+    ascList.add(o2);
+    ascList.add(o5);
+    List<TestPojo1> accumulatedValue = sort.defaultAccumulatedValue();
+    accumulatedValue = sort.accumulate(accumulatedValue, o1);
+    accumulatedValue = sort.accumulate(accumulatedValue, o2);
+    accumulatedValue = sort.accumulate(accumulatedValue, o3);
+    accumulatedValue = sort.accumulate(accumulatedValue, o4);
+    accumulatedValue = sort.accumulate(accumulatedValue, o5);
+
+    Iterator<TestPojo1> it = accumulatedValue.iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      Assert.assertEquals(ascList.get(i), it.next());
+      i++;
+    }
+
+    sort = new Sort<>(true, comparator);
+    List<TestPojo1> descAccumulatedValue = sort.defaultAccumulatedValue();
+    descAccumulatedValue = sort.accumulate(descAccumulatedValue, o1);
+    descAccumulatedValue = sort.accumulate(descAccumulatedValue, o2);
+    descAccumulatedValue = sort.accumulate(descAccumulatedValue, o3);
+    descAccumulatedValue = sort.accumulate(descAccumulatedValue, o4);
+    descAccumulatedValue = sort.accumulate(descAccumulatedValue, o5);
+
+    it = descAccumulatedValue.iterator();
+    i = ascList.size() - 1;
+    while (it.hasNext()) {
+      Assert.assertEquals(ascList.get(i), it.next());
+      i--;
+    }
+  }
+}