You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:53 UTC
[03/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
index 59b2c4d..10e0f5e 100644
--- a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
+++ b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java
@@ -23,14 +23,16 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang.ObjectUtils.Null;
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.lib.testbench.CollectorTestSink;
+import org.apache.commons.lang.ObjectUtils.Null;
+
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
/**
* A unit test to test SortedMovingWindow operator can either:
* 1. sort simple comparable tuples
@@ -45,7 +47,8 @@ public class SortedMovingWindowTest
* Test sorting simple comparable tuples within the sliding window
*/
@Test
- public void testSortingSimpleNumberTuple(){
+ public void testSortingSimpleNumberTuple()
+ {
SortedMovingWindow<Integer, Null> smw = new SortedMovingWindow<Integer, Null>();
CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
smw.outputPort.setSink(testSink);
@@ -73,9 +76,10 @@ public class SortedMovingWindowTest
SortedMovingWindow<Map<String, Integer>, Null> smw = new SortedMovingWindow<Map<String, Integer>, Null>();
- final String[] keys = { "number" };
+ final String[] keys = {"number"};
- smw.setComparator(new Comparator<Map<String, Integer>>() {
+ smw.setComparator(new Comparator<Map<String, Integer>>()
+ {
@Override
public int compare(Map<String, Integer> o1, Map<String, Integer> o2)
{
@@ -89,15 +93,17 @@ public class SortedMovingWindowTest
smw.setWindowSize(2);
// The incoming 6 simple map tuples are disordered among 4 windows
- emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Integer[][] { { 1 }, { 3 } }), createHashMapTuples(keys, new Integer[][] { { 2 }, { 5 } }),
- createHashMapTuples(keys, new Integer[][] { { 4 } }), createHashMapTuples(keys, new Integer[][] { { 6 } }) });
+ emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Integer[][]{{1}, {3}}),
+ createHashMapTuples(keys, new Integer[][]{{2}, {5}}),
+ createHashMapTuples(keys, new Integer[][]{{4}}), createHashMapTuples(keys, new Integer[][]{{6}})});
smw.beginWindow(4);
smw.endWindow();
smw.beginWindow(5);
smw.endWindow();
// The outcome is ordered by the value of the key "number"
- Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][] { { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 6 } })), testSink.collectedTuples);
+ Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][]{{1}, {2}, {3}, {4}, {5}, {6}})),
+ testSink.collectedTuples);
}
@@ -110,9 +116,10 @@ public class SortedMovingWindowTest
SortedMovingWindow<Map<String, Object>, String> smw = new SortedMovingWindow<Map<String, Object>, String>();
- final String[] keys = { "name", "number" };
+ final String[] keys = {"name", "number"};
- smw.setComparator(new Comparator<Map<String, Object>>() {
+ smw.setComparator(new Comparator<Map<String, Object>>()
+ {
@Override
public int compare(Map<String, Object> o1, Map<String, Object> o2)
{
@@ -121,12 +128,13 @@ public class SortedMovingWindowTest
}
});
- smw.setFunction(new Function<Map<String,Object>, String>() {
+ smw.setFunction(new Function<Map<String,Object>, String>()
+ {
@Override
public String apply(Map<String, Object> input)
{
// order tuple with same key "name"
- return (String) input.get(keys[0]);
+ return (String)input.get(keys[0]);
}
});
CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
@@ -135,23 +143,23 @@ public class SortedMovingWindowTest
smw.setWindowSize(2);
// The incoming 9 complex map tuples are disordered with same name among 4 windows
- emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Object[][] { {"bob", 1 }, {"jim", 1 } }), createHashMapTuples(keys, new Object[][] { {"jim", 2 }, { "bob", 3 } }),
- createHashMapTuples(keys, new Object[][] { { "bob", 2 }, { "jim", 4} }), createHashMapTuples(keys, new Object[][] { {"bob", 5}, {"jim", 3 }, {"bob", 4} }) });
+ emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Object[][]{{"bob", 1}, {"jim", 1}}),
+ createHashMapTuples(keys, new Object[][]{{"jim", 2}, {"bob", 3}}),
+ createHashMapTuples(keys, new Object[][]{{"bob", 2}, {"jim", 4}}),
+ createHashMapTuples(keys, new Object[][]{{"bob", 5}, {"jim", 3}, {"bob", 4}})});
smw.beginWindow(4);
smw.endWindow();
smw.beginWindow(5);
smw.endWindow();
// All tuples with same "name" are sorted by key "number"
- Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Object[][] { { "bob", 1 }, { "jim", 1 }, { "jim", 2 }, { "bob", 2 },
- { "bob", 3 }, { "jim", 3 }, { "jim", 4 }, { "bob", 4 }, { "bob", 5 } })), testSink.collectedTuples);
+ Assert.assertEquals(Arrays.asList(createHashMapTuples(keys,
+ new Object[][]{{"bob", 1}, {"jim", 1}, {"jim", 2}, {"bob", 2}, {"bob", 3}, {"jim", 3}, {"jim", 4}, {"bob", 4}, {"bob", 5}})), testSink.collectedTuples);
}
-
-
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void emitObjects(SortedMovingWindow win, Object[][] obj){
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void emitObjects(SortedMovingWindow win, Object[][] obj)
+ {
for (int i = 0; i < obj.length; i++) {
win.beginWindow(i);
for (int j = 0; j < obj[i].length; j++) {
@@ -161,8 +169,9 @@ public class SortedMovingWindowTest
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private Map[] createHashMapTuples(String[] cols, Object[][] values){
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private Map[] createHashMapTuples(String[] cols, Object[][] values)
+ {
HashMap[] maps = new HashMap[values.length];
int index = -1;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
index 3a6f7fe..8d52f7e 100644
--- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java
@@ -21,20 +21,21 @@ package com.datatorrent.lib.partitioner;
import java.util.Collection;
import java.util.List;
-import com.google.common.collect.Lists;
-
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.api.*;
+import com.google.common.collect.Lists;
+
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.api.StringCodec.Object2String;
-
-import com.datatorrent.lib.util.TestUtils;
-
import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.util.TestUtils;
public class StatelessPartitionerTest
{
@@ -101,7 +102,7 @@ public class StatelessPartitionerTest
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
Assert.assertEquals("Incorrect number of partitions", 1, newPartitions.size());
- for(Partition<DummyOperator> partition: newPartitions) {
+ for (Partition<DummyOperator> partition : newPartitions) {
Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
}
}
@@ -119,7 +120,7 @@ public class StatelessPartitionerTest
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0));
Assert.assertEquals("Incorrect number of partitions", 5, newPartitions.size());
- for(Partition<DummyOperator> partition: newPartitions) {
+ for (Partition<DummyOperator> partition : newPartitions) {
Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
}
}
@@ -149,7 +150,7 @@ public class StatelessPartitionerTest
partitions.add(mockPartition);
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
- new PartitioningContextImpl(null, 5));
+ new PartitioningContextImpl(null, 5));
Assert.assertEquals("after partition", 5, newPartitions.size());
}
@@ -172,7 +173,7 @@ public class StatelessPartitionerTest
}
Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions,
- new PartitioningContextImpl(null, 1));
+ new PartitioningContextImpl(null, 1));
Assert.assertEquals("after partition", 1, newPartitions.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
index 16647ab..6eaebc3 100644
--- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.lib.partitioner;
-import com.datatorrent.api.*;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -29,6 +28,13 @@ import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.util.TestUtils;
/**
@@ -150,8 +156,8 @@ public class StatelessThroughputBasedPartitionerTest
partitions.clear();
partitions.add(mockPartition);
- Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions(partitions,
- new StatelessPartitionerTest.PartitioningContextImpl(ports, 5));
+ Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions(
+ partitions, new StatelessPartitionerTest.PartitioningContextImpl(ports, 5));
Assert.assertEquals("after partition", 2, newPartitions.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
index f3f69fb..3ad30df 100644
--- a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java
@@ -56,7 +56,7 @@ public class JavaScriptOperatorTest
// Validate value.
Assert.assertEquals("number emitted tuples", 1, sink.collectedTuples.size());
for (Object o : sink.collectedTuples) { // count is 12
- Assert.assertEquals("4.0 is expected", (Double) o, 4.0, 0);
+ Assert.assertEquals("4.0 is expected", (Double)o, 4.0, 0);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
index 1cf89ff..47fa2c2 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java
@@ -18,12 +18,12 @@
*/
package com.datatorrent.lib.statistics;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
import org.junit.Assert;
import org.junit.Test;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
/**
* Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
index 158fed5..26e94c6 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java
@@ -18,12 +18,12 @@
*/
package com.datatorrent.lib.statistics;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
import org.junit.Assert;
import org.junit.Test;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
/**
* Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
index e3788c2..f9589db 100644
--- a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java
@@ -18,12 +18,12 @@
*/
package com.datatorrent.lib.statistics;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
import org.junit.Assert;
import org.junit.Test;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
/**
* Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br>
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
index 76983f5..13d6097 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java
@@ -30,36 +30,35 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class ArrayListAggregatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testNodeProcessing() throws Exception
- {
- ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>();
- CollectorTestSink cSink = new CollectorTestSink();
+ public void testNodeProcessing() throws Exception
+ {
+ ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>();
+ CollectorTestSink cSink = new CollectorTestSink();
- oper.output.setSink(cSink);
- oper.setSize(10);
- int numtuples = 100;
+ oper.output.setSink(cSink);
+ oper.setSize(10);
+ int numtuples = 100;
- oper.beginWindow(0);
- for (int i = 0; i < numtuples; i++) {
- oper.input.process(i);
- }
- oper.endWindow();
- Assert.assertEquals("number emitted tuples", 10,
- cSink.collectedTuples.size());
+ oper.beginWindow(0);
+ for (int i = 0; i < numtuples; i++) {
+ oper.input.process(i);
+ }
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", 10,
+ cSink.collectedTuples.size());
- cSink.clear();
- oper.setSize(0);
+ cSink.clear();
+ oper.setSize(0);
- oper.beginWindow(1);
- for (int i = 0; i < numtuples; i++) {
- oper.input.process(i);
- }
- oper.endWindow();
- Assert.assertEquals("number emitted tuples", 1,
- cSink.collectedTuples.size());
- ArrayList<?> list = (ArrayList<?>) cSink.collectedTuples.get(0);
- Assert.assertEquals("number emitted tuples", numtuples, list.size());
- }
+ oper.beginWindow(1);
+ for (int i = 0; i < numtuples; i++) {
+ oper.input.process(i);
+ }
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
+ ArrayList<?> list = (ArrayList<?>)cSink.collectedTuples.get(0);
+ Assert.assertEquals("number emitted tuples", numtuples, list.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
index fe0edc8..6f1504d 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java
@@ -30,29 +30,29 @@ import com.datatorrent.lib.testbench.CountTestSink;
* Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br>
* <br>
*/
-public class ArrayListToItemTest {
-
- /**
- * Test operator pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- ArrayListToItem oper = new ArrayListToItem();
- CountTestSink itemSink = new CountTestSink();
- oper.item.setSink(itemSink);
+public class ArrayListToItemTest
+{
+ /**
+ * Test operator pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ ArrayListToItem oper = new ArrayListToItem();
+ CountTestSink itemSink = new CountTestSink();
+ oper.item.setSink(itemSink);
- oper.beginWindow(0);
- ArrayList<String> input = new ArrayList<String>();
- input.add("a");
- // Same input object can be used as the oper is just pass through
- int numtuples = 1000;
- for (int i = 0; i < numtuples; i++) {
- oper.data.process(input);
- }
-
- oper.endWindow();
- Assert.assertEquals("number emitted tuples", numtuples, itemSink.count);
+ oper.beginWindow(0);
+ ArrayList<String> input = new ArrayList<String>();
+ input.add("a");
+ // Same input object can be used as the oper is just pass through
+ int numtuples = 1000;
+ for (int i = 0; i < numtuples; i++) {
+ oper.data.process(input);
}
+
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", numtuples, itemSink.count);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
index 697ae34..de419fd 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java
@@ -32,25 +32,25 @@ import com.datatorrent.lib.util.KeyValPair;
*/
public class ConsolidatorKeyValTest
{
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper =
- new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>();
- CollectorTestSink cSink = new CollectorTestSink();
- oper.out.setSink(cSink);
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper =
+ new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>();
+ CollectorTestSink cSink = new CollectorTestSink();
+ oper.out.setSink(cSink);
- oper.beginWindow(0);
- KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1);
- oper.in1.process(m1);
- KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0);
- oper.in2.process(m2);
- oper.endWindow();
- Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
+ oper.beginWindow(0);
+ KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1);
+ oper.in1.process(m1);
+ KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0);
+ oper.in2.process(m2);
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size());
- HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>) cSink.collectedTuples.get(0);
- Assert.assertEquals("size of sink map", 1, map.size());
- }
+ HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>)cSink.collectedTuples.get(0);
+ Assert.assertEquals("size of sink map", 1, map.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
index c9837cf..dc73a75 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java
@@ -27,33 +27,34 @@ import com.datatorrent.lib.testbench.CountTestSink;
* Functional test for {@link com.datatorrent.lib.stream.Counter}<p>
* <br>
*/
-public class CounterTest {
-
- /**
- * Test oper pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- Counter oper = new Counter();
- CountTestSink cSink = new CountTestSink();
-
- oper.output.setSink(cSink);
- int numtuples = 100;
-
- oper.beginWindow(0);
- for (int i = 0; i < numtuples; i++) {
- oper.input.process(i);
- }
- oper.endWindow();
-
- oper.beginWindow(1);
- for (int i = 0; i < numtuples; i++) {
- oper.input.process(i);
- }
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", 2, cSink.getCount());
+public class CounterTest
+{
+
+ /**
+ * Test oper pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ Counter oper = new Counter();
+ CountTestSink cSink = new CountTestSink();
+
+ oper.output.setSink(cSink);
+ int numtuples = 100;
+
+ oper.beginWindow(0);
+ for (int i = 0; i < numtuples; i++) {
+ oper.input.process(i);
}
+ oper.endWindow();
+
+ oper.beginWindow(1);
+ for (int i = 0; i < numtuples; i++) {
+ oper.input.process(i);
+ }
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", 2, cSink.getCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
index 0cb5f42..6266787 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java
@@ -18,13 +18,12 @@
*/
package com.datatorrent.lib.stream;
-import com.datatorrent.lib.stream.DevNullCounter;
-import com.datatorrent.lib.testbench.EventGenerator;
-
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.testbench.EventGenerator;
+
/**
*
* Functional tests for {@link com.datatorrent.lib.testbench.DevNullCounter}.
@@ -41,29 +40,26 @@ import org.slf4j.LoggerFactory;
public class DevNullCounterTest
{
- private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
+ private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
- /**
- * Tests both string and non string schema
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSingleSchemaNodeProcessing() throws Exception
- {
- DevNullCounter oper = new DevNullCounter();
- oper.setRollingwindowcount(5);
- oper.setup(null);
+ /**
+ * Tests both string and non string schema
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testSingleSchemaNodeProcessing() throws Exception
+ {
+ DevNullCounter oper = new DevNullCounter();
+ oper.setRollingwindowcount(5);
+ oper.setup(null);
- oper.beginWindow(0);
- long numtuples = 1000000;
- Object o = new Object();
- for (long i = 0; i < numtuples; i++) {
- oper.data.process(o);
- }
- oper.endWindow();
- LOG.info(String
- .format(
- "\n*******************************************************\nnumtuples(%d)",
- numtuples));
- }
+ oper.beginWindow(0);
+ long numtuples = 1000000;
+ Object o = new Object();
+ for (long i = 0; i < numtuples; i++) {
+ oper.data.process(o);
+ }
+ oper.endWindow();
+ LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
index 4695d69..3bd9c11 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java
@@ -18,26 +18,25 @@
*/
package com.datatorrent.lib.stream;
-import com.datatorrent.lib.stream.DevNull;
-import com.datatorrent.lib.testbench.EventGenerator;
-
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.testbench.EventGenerator;
+
/**
* Functional tests for {@link com.datatorrent.lib.testbench.DevNull}.
*/
-public class DevNullTest {
-
- private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
+public class DevNullTest
+{
+ private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class);
/**
* Tests both string and non string schema
*/
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
public void testSingleSchemaNodeProcessing() throws Exception
{
DevNull oper = new DevNull();
@@ -49,6 +48,6 @@ public class DevNullTest {
oper.data.process(o);
}
oper.endWindow();
- LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples));
+ LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
index 99305e3..314eb01 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java
@@ -31,35 +31,35 @@ import com.datatorrent.lib.testbench.CountTestSink;
public class HashMapToKeyValPairTest
{
- /**
- * Test oper pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- HashMapToKeyValPair oper = new HashMapToKeyValPair();
- CountTestSink keySink = new CountTestSink();
- CountTestSink valSink = new CountTestSink();
- CountTestSink keyvalSink = new CountTestSink();
+ /**
+ * Test oper pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ HashMapToKeyValPair oper = new HashMapToKeyValPair();
+ CountTestSink keySink = new CountTestSink();
+ CountTestSink valSink = new CountTestSink();
+ CountTestSink keyvalSink = new CountTestSink();
- oper.key.setSink(keySink);
- oper.val.setSink(valSink);
- oper.keyval.setSink(keyvalSink);
+ oper.key.setSink(keySink);
+ oper.val.setSink(valSink);
+ oper.keyval.setSink(keyvalSink);
- oper.beginWindow(0);
- HashMap<String, String> input = new HashMap<String, String>();
- input.put("a", "1");
- // Same input object can be used as the oper is just pass through
- int numtuples = 1000;
- for (int i = 0; i < numtuples; i++) {
- oper.data.process(input);
- }
+ oper.beginWindow(0);
+ HashMap<String, String> input = new HashMap<String, String>();
+ input.put("a", "1");
+ // Same input object can be used as the oper is just pass through
+ int numtuples = 1000;
+ for (int i = 0; i < numtuples; i++) {
+ oper.data.process(input);
+ }
- oper.endWindow();
+ oper.endWindow();
- Assert.assertEquals("number emitted tuples", numtuples, keySink.count);
- Assert.assertEquals("number emitted tuples", numtuples, valSink.count);
- Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count);
- }
+ Assert.assertEquals("number emitted tuples", numtuples, keySink.count);
+ Assert.assertEquals("number emitted tuples", numtuples, valSink.count);
+ Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
index a46eee7..e376c95 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java
@@ -32,76 +32,75 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class JsonByteArrayOperatorTest
{
- /**
- * Test json byte array to HashMap operator pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testOperator() throws Exception
- {
- JsonByteArrayOperator oper = new JsonByteArrayOperator();
- oper.setConcatenationCharacter('.');
-
- CollectorTestSink mapSink = new CollectorTestSink();
- CollectorTestSink jsonObjectSink = new CollectorTestSink();
- CollectorTestSink flatMapSink = new CollectorTestSink();
-
- oper.outputMap.setSink(mapSink);
- oper.outputJsonObject.setSink(jsonObjectSink);
- oper.outputFlatMap.setSink(flatMapSink);
-
- oper.beginWindow(0);
-
- // input test json string
- String inputJson = " { \"@timestamp\":\"2013-09-25T19:37:23.569Z\""
- + " ,\"@version\":\"1\""
- + " ,\"type\":\"apache-logs\""
- + " ,\"host\":\"node1001\""
- + " ,\"clientip\":192.168.150.120"
- + " ,\"verb\":\"GET\""
- + " ,\"request\":\"/reset.css\""
- + " ,\"httpversion\":\"1.1\""
- + " ,\"response\":200"
- + " ,\"agentinfo\": {\"browser\":Firefox"
- + " ,\"os\": { \"name\":\"Ubuntu\""
- + " ,\"version\":\"10.04\""
- + " }"
- + " }"
- + " ,\"bytes\":909.1"
- + " }";
-
- byte[] inputByteArray = inputJson.getBytes();
-
- // run the operator for the same string 1000 times
- int numtuples = 1000;
- for (int i = 0; i < numtuples; i++) {
- oper.input.process(inputByteArray);
- }
-
- oper.endWindow();
-
- // assert that the number of the operator generates is 1000
- Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size());
- Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size());
- Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size());
-
- // assert that value for one of the keys in any one of the objects from mapSink is as expected
- Object map = mapSink.collectedTuples.get(510);
- String expectedClientip = "192.168.150.120";
- Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip"));
-
-
- // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected
- Object jsonObject = jsonObjectSink.collectedTuples.get(433);
- Number expectedResponse = 200;
- Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response"));
-
- // assert that value for one of the keys in any one of the objects from flatMapSink is as expected
- Map flatMap = (Map)flatMapSink.collectedTuples.get(511);
- String expectedBrowser = "Firefox";
- String expectedOsName = "Ubuntu";
- Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser"));
- Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name"));
+ /**
+ * Test json byte array to HashMap operator pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ public void testOperator() throws Exception
+ {
+ JsonByteArrayOperator oper = new JsonByteArrayOperator();
+ oper.setConcatenationCharacter('.');
+
+ CollectorTestSink mapSink = new CollectorTestSink();
+ CollectorTestSink jsonObjectSink = new CollectorTestSink();
+ CollectorTestSink flatMapSink = new CollectorTestSink();
+
+ oper.outputMap.setSink(mapSink);
+ oper.outputJsonObject.setSink(jsonObjectSink);
+ oper.outputFlatMap.setSink(flatMapSink);
+
+ oper.beginWindow(0);
+
+ // input test json string
+ String inputJson = " { \"@timestamp\":\"2013-09-25T19:37:23.569Z\""
+ + " ,\"@version\":\"1\""
+ + " ,\"type\":\"apache-logs\""
+ + " ,\"host\":\"node1001\""
+ + " ,\"clientip\":192.168.150.120"
+ + " ,\"verb\":\"GET\""
+ + " ,\"request\":\"/reset.css\""
+ + " ,\"httpversion\":\"1.1\""
+ + " ,\"response\":200"
+ + " ,\"agentinfo\": {\"browser\":Firefox"
+ + " ,\"os\": { \"name\":\"Ubuntu\""
+ + " ,\"version\":\"10.04\""
+ + " }"
+ + " }"
+ + " ,\"bytes\":909.1"
+ + " }";
+
+ byte[] inputByteArray = inputJson.getBytes();
+
+ // run the operator for the same string 1000 times
+ int numtuples = 1000;
+ for (int i = 0; i < numtuples; i++) {
+ oper.input.process(inputByteArray);
}
+ oper.endWindow();
+
+ // assert that the number of the operator generates is 1000
+ Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size());
+ Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size());
+ Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size());
+
+ // assert that value for one of the keys in any one of the objects from mapSink is as expected
+ Object map = mapSink.collectedTuples.get(510);
+ String expectedClientip = "192.168.150.120";
+ Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip"));
+
+ // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected
+ Object jsonObject = jsonObjectSink.collectedTuples.get(433);
+ Number expectedResponse = 200;
+ Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response"));
+
+ // assert that value for one of the keys in any one of the objects from flatMapSink is as expected
+ Map flatMap = (Map)flatMapSink.collectedTuples.get(511);
+ String expectedBrowser = "Firefox";
+ String expectedOsName = "Ubuntu";
+ Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser"));
+ Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
index 2b5a583..2d0595f 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java
@@ -32,28 +32,28 @@ import com.datatorrent.lib.util.KeyValPair;
public class KeyPairToHashMapTest
{
- /**
- * Test oper pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- KeyValPairToHashMap oper = new KeyValPairToHashMap();
- CountTestSink mapSink = new CountTestSink();
-
- oper.map.setSink(mapSink);
-
- oper.beginWindow(0);
- KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1");
-
- // Same input object can be used as the oper is just pass through
- int numtuples = 1000;
- for (int i = 0; i < numtuples; i++) {
- oper.keyval.process(input);
- }
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", numtuples, mapSink.count);
- }
+ /**
+ * Test oper pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ KeyValPairToHashMap oper = new KeyValPairToHashMap();
+ CountTestSink mapSink = new CountTestSink();
+
+ oper.map.setSink(mapSink);
+
+ oper.beginWindow(0);
+ KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1");
+
+ // Same input object can be used as the oper is just pass through
+ int numtuples = 1000;
+ for (int i = 0; i < numtuples; i++) {
+ oper.keyval.process(input);
+ }
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", numtuples, mapSink.count);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
index c26c8d0..d78e369 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java
@@ -33,59 +33,59 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
public class RoundRobinHashMapTest
{
- private static Logger log = LoggerFactory
- .getLogger(RoundRobinHashMapTest.class);
+ private static Logger log = LoggerFactory
+ .getLogger(RoundRobinHashMapTest.class);
- /**
- * Test operator pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- RoundRobinHashMap oper = new RoundRobinHashMap();
- CollectorTestSink mapSink = new CollectorTestSink();
+ /**
+ * Test operator pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ RoundRobinHashMap oper = new RoundRobinHashMap();
+ CollectorTestSink mapSink = new CollectorTestSink();
- String[] keys = new String[3];
- keys[0] = "a";
- keys[1] = "b";
- keys[2] = "c";
+ String[] keys = new String[3];
+ keys[0] = "a";
+ keys[1] = "b";
+ keys[2] = "c";
- oper.setKeys(keys);
- oper.map.setSink(mapSink);
- oper.beginWindow(0);
+ oper.setKeys(keys);
+ oper.map.setSink(mapSink);
+ oper.beginWindow(0);
- HashMap<String, Integer> t1 = new HashMap<String, Integer>();
- t1.put("a", 0);
- t1.put("b", 1);
- t1.put("c", 2);
- HashMap<String, Integer> t2 = new HashMap<String, Integer>();
- t2.put("a", 3);
- t2.put("b", 4);
- t2.put("c", 5);
- HashMap<String, Integer> t3 = new HashMap<String, Integer>();
- t3.put("a", 6);
- t3.put("b", 7);
- t3.put("c", 8);
+ HashMap<String, Integer> t1 = new HashMap<String, Integer>();
+ t1.put("a", 0);
+ t1.put("b", 1);
+ t1.put("c", 2);
+ HashMap<String, Integer> t2 = new HashMap<String, Integer>();
+ t2.put("a", 3);
+ t2.put("b", 4);
+ t2.put("c", 5);
+ HashMap<String, Integer> t3 = new HashMap<String, Integer>();
+ t3.put("a", 6);
+ t3.put("b", 7);
+ t3.put("c", 8);
- HashMap<String, Integer> t4 = new HashMap<String, Integer>();
- t4.put("a", 9);
- t4.put("b", 10);
- t4.put("c", 11);
+ HashMap<String, Integer> t4 = new HashMap<String, Integer>();
+ t4.put("a", 9);
+ t4.put("b", 10);
+ t4.put("c", 11);
- // Same input object can be used as the oper is just pass through
- int numtuples = 12;
- for (int i = 0; i < numtuples; i++) {
- oper.data.process(i);
- }
- oper.endWindow();
+ // Same input object can be used as the oper is just pass through
+ int numtuples = 12;
+ for (int i = 0; i < numtuples; i++) {
+ oper.data.process(i);
+ }
+ oper.endWindow();
- Assert.assertEquals("number emitted tuples", numtuples / 3,
- mapSink.collectedTuples.size());
- log.debug(mapSink.collectedTuples.toString());
- Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0));
- Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1));
- Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2));
- Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3));
- }
+ Assert.assertEquals("number emitted tuples", numtuples / 3,
+ mapSink.collectedTuples.size());
+ log.debug(mapSink.collectedTuples.toString());
+ Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0));
+ Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1));
+ Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2));
+ Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
index 1c05b6c..7586c70 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java
@@ -28,34 +28,35 @@ import com.datatorrent.lib.testbench.CountTestSink;
* Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br>
* <br>
*/
-public class StreamDuplicaterTest {
-
- /**
- * Test oper pass through. The Object passed is not relevant
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing() throws Exception
- {
- StreamDuplicater oper = new StreamDuplicater();
- CountTestSink mergeSink1 = new CountTestSink();
- CountTestSink mergeSink2 = new CountTestSink();
-
- oper.out1.setSink(mergeSink1);
- oper.out2.setSink(mergeSink2);
-
- oper.beginWindow(0);
- int numtuples = 1000;
- Integer input = new Integer(0);
- // Same input object can be used as the oper is just pass through
- for (int i = 0; i < numtuples; i++) {
- oper.data.process(input);
- }
-
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count);
- Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count);
+public class StreamDuplicaterTest
+{
+
+ /**
+ * Test oper pass through. The Object passed is not relevant
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ StreamDuplicater oper = new StreamDuplicater();
+ CountTestSink mergeSink1 = new CountTestSink();
+ CountTestSink mergeSink2 = new CountTestSink();
+
+ oper.out1.setSink(mergeSink1);
+ oper.out2.setSink(mergeSink2);
+
+ oper.beginWindow(0);
+ int numtuples = 1000;
+ Integer input = 0;
+ // Same input object can be used as the oper is just pass through
+ for (int i = 0; i < numtuples; i++) {
+ oper.data.process(input);
}
+
+ oper.endWindow();
+
+ // One for each key
+ Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count);
+ Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
index fe626f4..995dc6f 100644
--- a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java
@@ -43,7 +43,7 @@ public class StreamMergerTest
oper.beginWindow(0);
int numtuples = 500;
- Integer input = new Integer(0);
+ Integer input = 0;
// Same input object can be used as the oper is just pass through
for (int i = 0; i < numtuples; i++) {
oper.data1.process(input);
@@ -51,6 +51,6 @@ public class StreamMergerTest
}
oper.endWindow();
- Assert.assertEquals("number emitted tuples", numtuples*2, mergeSink.count);
+ Assert.assertEquals("number emitted tuples", numtuples * 2, mergeSink.count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
index b0d3c01..1f29d1d 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
@@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.datatorrent.lib.streamquery.DeleteOperator;
import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -31,44 +32,46 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class DeleteOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- DeleteOperator oper = new DeleteOperator();
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ // create operator
+ DeleteOperator oper = new DeleteOperator();
+
+ EqualValueCondition condition = new EqualValueCondition();
+ condition.addEqualValue("a", 1);
+ oper.setCondition(condition);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
index 97d587b..728fb96 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.Condition;
import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
public class FullOuterJoinOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- oper.setFullJoin(true);
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
+ // create operator
+ OuterJoinOperator oper = new OuterJoinOperator();
+ oper.setFullJoin(true);
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
// set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
@@ -46,43 +48,46 @@ public class FullOuterJoinOperatorTest
// add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 11);
- tuple.put("c", 12);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 2);
+ tuple.put("b", 11);
+ tuple.put("c", 12);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
index 0d4c939..714f93b 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
import com.datatorrent.lib.streamquery.function.SumFunction;
@@ -32,58 +34,61 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class GroupByOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlGroupBy()
{
- // create operator
- GroupByHavingOperator oper = new GroupByHavingOperator();
- oper.addColumnGroupByIndex(new ColumnIndex("b", null));
- try {
+ // create operator
+ GroupByHavingOperator oper = new GroupByHavingOperator();
+ oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+ try {
oper.addAggregateIndex(new SumFunction("c", null));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return;
}
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
+
+ EqualValueCondition condition = new EqualValueCondition();
+ condition.addEqualValue("a", 1);
+ oper.setCondition(condition);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 1);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 2);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
tuple = new HashMap<String, Object>();
tuple.put("a", 1);
tuple.put("b", 2);
tuple.put("c", 7);
oper.inport.process(tuple);
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
index 7ccb2ed..e11723d 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
import com.datatorrent.lib.streamquery.condition.HavingCompareValue;
@@ -35,57 +37,60 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class HavingOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlGroupBy() throws Exception
{
- // create operator
- GroupByHavingOperator oper = new GroupByHavingOperator();
- oper.addColumnGroupByIndex(new ColumnIndex("b", null));
- FunctionIndex sum = new SumFunction("c", null);
+ // create operator
+ GroupByHavingOperator oper = new GroupByHavingOperator();
+ oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+ FunctionIndex sum = new SumFunction("c", null);
oper.addAggregateIndex(sum);
// create having condition
HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0);
oper.addHavingCondition(having);
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
+
+ EqualValueCondition condition = new EqualValueCondition();
+ condition.addEqualValue("a", 1);
+ oper.setCondition(condition);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 1);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 2);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
tuple = new HashMap<String, Object>();
tuple.put("a", 1);
tuple.put("b", 2);
tuple.put("c", 7);
oper.inport.process(tuple);
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
index 2f14f16..8a022ee 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.Condition;
import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -34,53 +36,56 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class InnerJoinOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- InnerJoinOperator oper = new InnerJoinOperator();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- // set column join condition
- Condition cond = new JoinColumnEqualCondition("a", "a");
- oper.setJoinCondition(cond);
-
- // add columns
- oper.selectTable1Column(new ColumnIndex("b", null));
- oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ // create operator
+ InnerJoinOperator oper = new InnerJoinOperator();
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ // set column join condition
+ Condition cond = new JoinColumnEqualCondition("a", "a");
+ oper.setJoinCondition(cond);
+
+ // add columns
+ oper.selectTable1Column(new ColumnIndex("b", null));
+ oper.selectTable2Column(new ColumnIndex("c", null));
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
index 32e5b13..aa25e87 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.Condition;
import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,15 +31,15 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
public class LeftOuterJoinOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
+ // create operator
+ OuterJoinOperator oper = new OuterJoinOperator();
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
// set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
@@ -45,43 +47,46 @@ public class LeftOuterJoinOperatorTest
// add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 11);
- tuple.put("c", 12);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 2);
+ tuple.put("b", 11);
+ tuple.put("c", 12);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
index b233290..2d7ba87 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -29,60 +31,63 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class OrderByOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // craete operator
+ // craete operator
OrderByOperator oper = new OrderByOperator();
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
- oper.addOrderByRule(new OrderByRule<Integer>("b"));
- oper.setDescending(true);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("c", 2);
- tuple.put("a", 0);
- tuple.put("b", 1);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 6);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 4);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 8);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+ oper.addOrderByRule(new OrderByRule<Integer>("b"));
+ oper.setDescending(true);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("c", 2);
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 2);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 2);
+ tuple.put("b", 6);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 4);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 8);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
index f99ee25..3a57427 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.streamquery.condition.Condition;
import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
@@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
public class RightOuterJoinOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- oper.setRighttJoin();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
+ // create operator
+ OuterJoinOperator oper = new OuterJoinOperator();
+ oper.setRighttJoin();
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
// set column join condition
Condition cond = new JoinColumnEqualCondition("a", "a");
oper.setJoinCondition(cond);
@@ -46,44 +48,47 @@ public class RightOuterJoinOperatorTest
// add columns
oper.selectTable1Column(new ColumnIndex("b", null));
oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<String, Object>();
tuple.put("a", 2);
tuple.put("b", 11);
tuple.put("c", 12);
oper.inport2.process(tuple);
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
index 3ac18f8..8e6620e 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
@@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.datatorrent.lib.streamquery.SelectOperator;
import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
import com.datatorrent.lib.streamquery.index.ColumnIndex;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -32,46 +33,49 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class SelectOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- SelectOperator oper = new SelectOperator();
- oper.addIndex(new ColumnIndex("b", null));
- oper.addIndex(new ColumnIndex("c", null));
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ // create operator
+ SelectOperator oper = new SelectOperator();
+ oper.addIndex(new ColumnIndex("b", null));
+ oper.addIndex(new ColumnIndex("c", null));
+
+ EqualValueCondition condition = new EqualValueCondition();
+ condition.addEqualValue("a", 1);
+ oper.setCondition(condition);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
index 8c894d1..c92c6c1 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -54,7 +56,10 @@ public class SelectTopOperatorTest
tuple.put("c", 6);
oper.inport.process(tuple);
oper.endWindow();
-
- System.out.println(sink.collectedTuples.toString());
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
index 70713db..42af56b 100644
--- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
@@ -21,52 +21,56 @@ package com.datatorrent.lib.streamquery;
import java.util.HashMap;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.datatorrent.lib.streamquery.UpdateOperator;
import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
import com.datatorrent.lib.testbench.CollectorTestSink;
public class UpdateOperatorTest
{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
public void testSqlSelect()
{
- // create operator
- UpdateOperator oper = new UpdateOperator();
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
- oper.addUpdate("c", 100);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- System.out.println(sink.collectedTuples.toString());
+ // create operator
+ UpdateOperator oper = new UpdateOperator();
+
+ EqualValueCondition condition = new EqualValueCondition();
+ condition.addEqualValue("a", 1);
+ oper.setCondition(condition);
+ oper.addUpdate("c", 100);
+
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+
+ oper.setup(null);
+ oper.beginWindow(1);
+
+ HashMap<String, Object> tuple = new HashMap<String, Object>();
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport.process(tuple);
+
+ tuple = new HashMap<String, Object>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ LOG.debug("{}", sink.collectedTuples);
}
+
+ private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class);
+
}