You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/03/29 08:00:33 UTC
apex-malhar git commit: APEXMALHAR-2453 Added Sort Accumulation for
Windowed operators
Repository: apex-malhar
Updated Branches:
refs/heads/master f2bc30fce -> 611701c3d
APEXMALHAR-2453 Added Sort Accumulation for Windowed operators
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/611701c3
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/611701c3
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/611701c3
Branch: refs/heads/master
Commit: 611701c3df6bf95148fc9e3a6445efeee26581f4
Parents: f2bc30f
Author: ajaygit158 <aj...@gmail.com>
Authored: Tue Mar 21 17:20:19 2017 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Wed Mar 29 12:09:04 2017 +0530
----------------------------------------------------------------------
.../malhar/lib/window/accumulation/Sort.java | 106 ++++++++++++
.../lib/window/accumulation/SortTest.java | 160 +++++++++++++++++++
2 files changed, 266 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
new file mode 100644
index 0000000..7dc684f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Sort accumulation
+ */
+@InterfaceStability.Evolving
+public class Sort<T> implements Accumulation<T, List<T>, List<T>>
+{
+ boolean reverseSort;
+ Comparator<T> comparator;
+ Comparator<T> reverseComparator;
+
+ public Sort()
+ {
+ //for kryo
+ }
+
+ /**
+ * @param reverseSort
+ * sort in order opposite to how the comparator would sort
+ * @param comparator
+ * comparator to sort the tuples
+ */
+ public Sort(final boolean reverseSort, final Comparator<T> comparator)
+ {
+ this.reverseSort = reverseSort;
+ this.comparator = comparator;
+ this.reverseComparator = Collections.reverseOrder(comparator);
+ }
+
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ if (comparator == null) {
+ throw new RuntimeException("Comparator not provided, Tuple cannot be compared");
+ }
+ Comparator<T> accComparator = reverseSort ? reverseComparator : comparator;
+ insertElement(accumulatedValue, input, accComparator);
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ if (comparator == null) {
+ throw new RuntimeException("Comparator not provided, Tuple cannot be compared");
+ }
+ Comparator<T> accComparator = reverseSort ? reverseComparator : comparator;
+ for (T t : accumulatedValue2) {
+ insertElement(accumulatedValue1, t, accComparator);
+ }
+ return accumulatedValue1;
+ }
+
+ private void insertElement(List<T> accumulatedValue, T element, Comparator<T> comparator)
+ {
+ //binarySearch returns location if input exists else returns (-(insertion point) - 1)
+ int index = Collections.binarySearch(accumulatedValue, element, comparator);
+ index = index >= 0 ? index : (-index - 1);
+ accumulatedValue.add(index, element);
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> accumulatedValue)
+ {
+ return new ArrayList<T>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
new file mode 100644
index 0000000..daeb4fc
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Sort accumulation
+ */
+public class SortTest
+{
+ public static class TestPojo1
+ {
+ private int uId;
+ private String uName;
+
+ public TestPojo1()
+ {
+
+ }
+
+ public TestPojo1(int id, String name)
+ {
+ this.uId = id;
+ this.uName = name;
+ }
+
+ public int getUId()
+ {
+ return uId;
+ }
+
+ public void setUId(int uId)
+ {
+ this.uId = uId;
+ }
+
+ public String getUName()
+ {
+ return uName;
+ }
+
+ public void setUName(String uName)
+ {
+ this.uName = uName;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ TestPojo1 other = (TestPojo1)obj;
+ if (uId != other.uId) {
+ return false;
+ }
+ if (uName == null) {
+ if (other.uName != null) {
+ return false;
+ }
+ } else if (!uName.equals(other.uName)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void SortTestAscDesc()
+ {
+ Comparator<TestPojo1> comparator = new Comparator<TestPojo1>()
+ {
+
+ @Override
+ public int compare(TestPojo1 o1, TestPojo1 o2)
+ {
+ if (o1 == null && o2 == null) {
+ return 0;
+ } else if (o1 == null) {
+ return -1;
+ } else if (o2 == null) {
+ return 1;
+ } else if (o1.getUId() != o2.getUId()) {
+ return o1.getUId() - o2.getUId();
+ } else {
+ return o1.getUName().compareTo(o2.getUName());
+ }
+ }
+ };
+ Sort<TestPojo1> sort = new Sort<>(false, comparator);
+ TestPojo1 o1 = new TestPojo1(5, "user1");
+ TestPojo1 o2 = new TestPojo1(15, "user32");
+ TestPojo1 o3 = new TestPojo1(5, "user11");
+ TestPojo1 o4 = new TestPojo1(2, "user12");
+ TestPojo1 o5 = new TestPojo1(15, "user32");
+ List<TestPojo1> ascList = new ArrayList<>();
+ ascList.add(o4);
+ ascList.add(o1);
+ ascList.add(o3);
+ ascList.add(o2);
+ ascList.add(o5);
+ List<TestPojo1> accumulatedValue = sort.defaultAccumulatedValue();
+ accumulatedValue = sort.accumulate(accumulatedValue, o1);
+ accumulatedValue = sort.accumulate(accumulatedValue, o2);
+ accumulatedValue = sort.accumulate(accumulatedValue, o3);
+ accumulatedValue = sort.accumulate(accumulatedValue, o4);
+ accumulatedValue = sort.accumulate(accumulatedValue, o5);
+
+ Iterator<TestPojo1> it = accumulatedValue.iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ Assert.assertEquals(ascList.get(i), it.next());
+ i++;
+ }
+
+ sort = new Sort<>(true, comparator);
+ List<TestPojo1> descAccumulatedValue = sort.defaultAccumulatedValue();
+ descAccumulatedValue = sort.accumulate(descAccumulatedValue, o1);
+ descAccumulatedValue = sort.accumulate(descAccumulatedValue, o2);
+ descAccumulatedValue = sort.accumulate(descAccumulatedValue, o3);
+ descAccumulatedValue = sort.accumulate(descAccumulatedValue, o4);
+ descAccumulatedValue = sort.accumulate(descAccumulatedValue, o5);
+
+ it = descAccumulatedValue.iterator();
+ i = ascList.size() - 1;
+ while (it.hasNext()) {
+ Assert.assertEquals(ascList.get(i), it.next());
+ i--;
+ }
+ }
+}