You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 18:30:46 UTC
[02/12] apex-malhar git commit: Updated algo & working on math
operators
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
index 1973bec..712bcdc 100644
--- a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.math;
import org.junit.Assert;
import org.junit.Test;
+import com.datatorrent.common.util.Pair;
+
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
@@ -96,7 +98,7 @@ public class AverageTest
Assert.assertEquals("number emitted tuples", 1, averageSink.collectedTuples.size());
for (Object o : averageSink.collectedTuples) { // count is 12
- Integer val = ((Number)o).intValue();
+ Number val = ((Pair<? extends Number, Integer>)o).getFirst().intValue();
Assert.assertEquals("emitted average value was was ", new Integer(1157 / 12), val);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
deleted file mode 100644
index 7d7842e..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertKeyVal}.
- * <p>
- *
- */
-public class ChangeAlertKeyValTest
-{
- private static Logger log = LoggerFactory
- .getLogger(ChangeAlertKeyValTest.class);
-
- /**
- * Test node logic emits correct results.
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new ChangeAlertKeyVal<String, Integer>());
- testNodeProcessingSchema(new ChangeAlertKeyVal<String, Double>());
- testNodeProcessingSchema(new ChangeAlertKeyVal<String, Float>());
- testNodeProcessingSchema(new ChangeAlertKeyVal<String, Short>());
- testNodeProcessingSchema(new ChangeAlertKeyVal<String, Long>());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <V extends Number> void testNodeProcessingSchema(
- ChangeAlertKeyVal<String, V> oper)
- {
- CollectorTestSink alertSink = new CollectorTestSink();
-
- oper.alert.setSink(alertSink);
- oper.setPercentThreshold(5);
-
- oper.beginWindow(0);
- oper.data.process(new KeyValPair<String, V>("a", oper.getValue(200)));
- oper.data.process(new KeyValPair<String, V>("b", oper.getValue(10)));
- oper.data.process(new KeyValPair<String, V>("c", oper.getValue(100)));
-
- oper.data.process(new KeyValPair<String, V>("a", oper.getValue(203)));
- oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
- oper.data.process(new KeyValPair<String, V>("c", oper.getValue(101)));
-
- oper.data.process(new KeyValPair<String, V>("a", oper.getValue(210)));
- oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
- oper.data.process(new KeyValPair<String, V>("c", oper.getValue(102)));
-
- oper.data.process(new KeyValPair<String, V>("a", oper.getValue(231)));
- oper.data.process(new KeyValPair<String, V>("b", oper.getValue(18)));
- oper.data.process(new KeyValPair<String, V>("c", oper.getValue(103)));
- oper.endWindow();
-
- // One for a, Two for b
- Assert.assertEquals("number emitted tuples", 3,
- alertSink.collectedTuples.size());
-
- double aval = 0;
- double bval = 0;
- log.debug("\nLogging tuples");
- for (Object o : alertSink.collectedTuples) {
- KeyValPair<String, KeyValPair<Number, Double>> map = (KeyValPair<String, KeyValPair<Number, Double>>)o;
-
- log.debug(o.toString());
- if (map.getKey().equals("a")) {
- KeyValPair<Number, Double> vmap = map.getValue();
- if (vmap != null) {
- aval += vmap.getValue().doubleValue();
- }
- } else {
- KeyValPair<Number, Double> vmap = map.getValue();
- if (vmap != null) {
- bval += vmap.getValue().doubleValue();
- }
- }
- }
- Assert.assertEquals("change in a", 10.0, aval,0);
- Assert.assertEquals("change in a", 70.0, bval,0);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
deleted file mode 100644
index 51f52f0..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertMap}.
- * <p>
- *
- */
-public class ChangeAlertMapTest
-{
- private static Logger log = LoggerFactory.getLogger(ChangeAlertMapTest.class);
-
- /**
- * Test node logic emits correct results.
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new ChangeAlertMap<String, Integer>());
- testNodeProcessingSchema(new ChangeAlertMap<String, Double>());
- testNodeProcessingSchema(new ChangeAlertMap<String, Float>());
- testNodeProcessingSchema(new ChangeAlertMap<String, Short>());
- testNodeProcessingSchema(new ChangeAlertMap<String, Long>());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <V extends Number> void testNodeProcessingSchema(
- ChangeAlertMap<String, V> oper)
- {
- CollectorTestSink alertSink = new CollectorTestSink();
-
- oper.alert.setSink(alertSink);
- oper.setPercentThreshold(5);
-
- oper.beginWindow(0);
- HashMap<String, V> input = new HashMap<String, V>();
- input.put("a", oper.getValue(200));
- input.put("b", oper.getValue(10));
- input.put("c", oper.getValue(100));
- oper.data.process(input);
-
- input.clear();
- input.put("a", oper.getValue(203));
- input.put("b", oper.getValue(12));
- input.put("c", oper.getValue(101));
- oper.data.process(input);
-
- input.clear();
- input.put("a", oper.getValue(210));
- input.put("b", oper.getValue(12));
- input.put("c", oper.getValue(102));
- oper.data.process(input);
-
- input.clear();
- input.put("a", oper.getValue(231));
- input.put("b", oper.getValue(18));
- input.put("c", oper.getValue(103));
- oper.data.process(input);
- oper.endWindow();
-
- // One for a, Two for b
- Assert.assertEquals("number emitted tuples", 3,
- alertSink.collectedTuples.size());
-
- double aval = 0;
- double bval = 0;
- log.debug("\nLogging tuples");
- for (Object o : alertSink.collectedTuples) {
- HashMap<String, HashMap<Number, Double>> map = (HashMap<String, HashMap<Number, Double>>)o;
- Assert.assertEquals("map size", 1, map.size());
- log.debug(o.toString());
- HashMap<Number, Double> vmap = map.get("a");
- if (vmap != null) {
- aval += vmap.get(231.0).doubleValue();
- }
- vmap = map.get("b");
- if (vmap != null) {
- if (vmap.get(12.0) != null) {
- bval += vmap.get(12.0).doubleValue();
- } else {
- bval += vmap.get(18.0).doubleValue();
- }
- }
- }
- Assert.assertEquals("change in a", 10.0, aval,0);
- Assert.assertEquals("change in a", 70.0, bval,0);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
deleted file mode 100644
index d127231..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlert}. <p>
- *
- */
-public class ChangeAlertTest
-{
- private static Logger log = LoggerFactory.getLogger(ChangeAlertTest.class);
-
- /**
- * Test node logic emits correct results.
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new ChangeAlert<Integer>());
- testNodeProcessingSchema(new ChangeAlert<Double>());
- testNodeProcessingSchema(new ChangeAlert<Float>());
- testNodeProcessingSchema(new ChangeAlert<Short>());
- testNodeProcessingSchema(new ChangeAlert<Long>());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <V extends Number> void testNodeProcessingSchema(ChangeAlert<V> oper)
- {
- CollectorTestSink alertSink = new CollectorTestSink();
-
- oper.alert.setSink(alertSink);
- oper.setPercentThreshold(5);
-
- oper.beginWindow(0);
- oper.data.process(oper.getValue(10));
- oper.data.process(oper.getValue(12)); // alert
- oper.data.process(oper.getValue(12));
- oper.data.process(oper.getValue(18)); // alert
- oper.data.process(oper.getValue(0)); // alert
- oper.data.process(oper.getValue(20)); // this will not alert
- oper.data.process(oper.getValue(30)); // alert
-
- oper.endWindow();
-
- // One for a, Two for b
- Assert.assertEquals("number emitted tuples", 4, alertSink.collectedTuples.size());
-
- double aval = 0;
- log.debug("\nLogging tuples");
- for (Object o: alertSink.collectedTuples) {
- KeyValPair<Number, Double> map = (KeyValPair<Number, Double>)o;
- log.debug(o.toString());
- aval += map.getValue().doubleValue();
- }
- Assert.assertEquals("change in a", 220.0, aval,0);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
deleted file mode 100644
index 6c2151b..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeKeyVal}.
- * <p>
- *
- */
-public class ChangeKeyValTest
-{
- private static Logger log = LoggerFactory.getLogger(ChangeKeyValTest.class);
-
- /**
- * Test node logic emits correct results.
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new ChangeKeyVal<String, Integer>());
- testNodeProcessingSchema(new ChangeKeyVal<String, Double>());
- testNodeProcessingSchema(new ChangeKeyVal<String, Float>());
- testNodeProcessingSchema(new ChangeKeyVal<String, Short>());
- testNodeProcessingSchema(new ChangeKeyVal<String, Long>());
- }
-
- /**
- *
- * @param oper
- * key/value pair for comparison.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <V extends Number> void testNodeProcessingSchema(
- ChangeKeyVal<String, V> oper)
- {
- CollectorTestSink changeSink = new CollectorTestSink();
- CollectorTestSink percentSink = new CollectorTestSink();
-
- oper.change.setSink(changeSink);
- oper.percent.setSink(percentSink);
-
- oper.beginWindow(0);
- oper.base.process(new KeyValPair<String, V>("a", oper.getValue(2)));
- oper.base.process(new KeyValPair<String, V>("b", oper.getValue(10)));
- oper.base.process(new KeyValPair<String, V>("c", oper.getValue(100)));
-
- oper.data.process(new KeyValPair<String, V>("a", oper.getValue(3)));
- oper.data.process(new KeyValPair<String, V>("b", oper.getValue(2)));
- oper.data.process(new KeyValPair<String, V>("c", oper.getValue(4)));
-
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", 3,
- changeSink.collectedTuples.size());
- Assert.assertEquals("number emitted tuples", 3,
- percentSink.collectedTuples.size());
-
- log.debug("\nLogging tuples");
- for (Object o : changeSink.collectedTuples) {
- KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
- if (kv.getKey().equals("a")) {
- Assert.assertEquals("change in a ", 1.0, kv.getValue());
- }
- if (kv.getKey().equals("b")) {
- Assert.assertEquals("change in b ", -8.0, kv.getValue());
- }
- if (kv.getKey().equals("c")) {
- Assert.assertEquals("change in c ", -96.0, kv.getValue());
- }
- }
-
- for (Object o : percentSink.collectedTuples) {
- KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
- if (kv.getKey().equals("a")) {
- Assert.assertEquals("change in a ", 50.0, kv.getValue());
- }
- if (kv.getKey().equals("b")) {
- Assert.assertEquals("change in b ", -80.0, kv.getValue());
- }
- if (kv.getKey().equals("c")) {
- Assert.assertEquals("change in c ", -96.0, kv.getValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
deleted file mode 100644
index 9595a5d..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.Change}.
- * <p>
- *
- */
-public class ChangeTest
-{
- private static Logger log = LoggerFactory.getLogger(ChangeTest.class);
-
- /**
- * Test node logic emits correct results.
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new Change<Integer>());
- testNodeProcessingSchema(new Change<Double>());
- testNodeProcessingSchema(new Change<Float>());
- testNodeProcessingSchema(new Change<Short>());
- testNodeProcessingSchema(new Change<Long>());
- }
-
- /**
- *
- * @param oper Data value for comparison.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <V extends Number> void testNodeProcessingSchema(Change<V> oper)
- {
- CollectorTestSink changeSink = new CollectorTestSink();
- CollectorTestSink percentSink = new CollectorTestSink();
-
- oper.change.setSink(changeSink);
- oper.percent.setSink(percentSink);
-
- oper.beginWindow(0);
- oper.base.process(oper.getValue(10));
- oper.data.process(oper.getValue(5));
- oper.data.process(oper.getValue(15));
- oper.data.process(oper.getValue(20));
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", 3,
- changeSink.collectedTuples.size());
- Assert.assertEquals("number emitted tuples", 3,
- percentSink.collectedTuples.size());
-
- log.debug("\nLogging tuples");
- for (Object o : changeSink.collectedTuples) {
- log.debug(String.format("change %s", o));
- }
- for (Object o : percentSink.collectedTuples) {
- log.debug(String.format("percent change %s", o));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
deleted file mode 100644
index 46b9609..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CompareExceptMap}<p>
- *
- */
-public class CompareExceptMapTest
-{
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new CompareExceptMap<String, Integer>());
- testNodeProcessingSchema(new CompareExceptMap<String, Double>());
- testNodeProcessingSchema(new CompareExceptMap<String, Float>());
- testNodeProcessingSchema(new CompareExceptMap<String, Short>());
- testNodeProcessingSchema(new CompareExceptMap<String, Long>());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void testNodeProcessingSchema(CompareExceptMap oper)
- {
- CountAndLastTupleTestSink compareSink = new CountAndLastTupleTestSink();
- CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
- oper.compare.setSink(compareSink);
- oper.except.setSink(exceptSink);
-
- oper.setKey("a");
- oper.setValue(3.0);
- oper.setTypeEQ();
-
- oper.beginWindow(0);
- HashMap<String, Number> input = new HashMap<String, Number>();
- input.put("a", 2);
- input.put("b", 20);
- input.put("c", 1000);
- oper.data.process(input);
- input.clear();
- input.put("a", 3);
- input.put("b", 21);
- input.put("c", 30);
- oper.data.process(input);
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
- for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple).entrySet()) {
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0);
- }
- }
-
- Assert.assertEquals("number emitted tuples", 1, compareSink.count);
- for (Map.Entry<String, Number> e : ((HashMap<String, Number>)compareSink.tuple).entrySet()) {
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", new Double(3), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", new Double(21), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", new Double(30), e.getValue().doubleValue(), 0);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
deleted file mode 100644
index c21019b..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CompareMap}<p>
- *
- */
-public class CompareMapTest
-{
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new CompareMap<String, Integer>());
- testNodeProcessingSchema(new CompareMap<String, Double>());
- testNodeProcessingSchema(new CompareMap<String, Float>());
- testNodeProcessingSchema(new CompareMap<String, Short>());
- testNodeProcessingSchema(new CompareMap<String, Long>());
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void testNodeProcessingSchema(CompareMap oper)
- {
- CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
- oper.compare.setSink(matchSink);
- oper.setKey("a");
- oper.setValue(3.0);
- oper.setTypeNEQ();
-
- oper.beginWindow(0);
- HashMap<String, Number> input = new HashMap<String, Number>();
-
- input.put("a", 2);
- input.put("b", 20);
- input.put("c", 1000);
- oper.data.process(input);
- input.clear();
- input.put("a", 3);
- oper.data.process(input);
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", 1, matchSink.count);
- for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) {
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0);
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
deleted file mode 100644
index 0b1d9de..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CountKeyVal}. <p>
- *
- */
-public class CountKeyValTest
-{
- /**
- * Test operator logic emits correct results.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testNodeProcessing()
- {
- CountKeyVal<String, Double> oper = new CountKeyVal<String, Double>();
- CollectorTestSink countSink = new CollectorTestSink();
- oper.count.setSink(countSink);
-
- oper.beginWindow(0); //
-
- oper.data.process(new KeyValPair("a", 2.0));
- oper.data.process(new KeyValPair("b", 20.0));
- oper.data.process(new KeyValPair("c", 1000.0));
- oper.data.process(new KeyValPair("a", 1.0));
- oper.data.process(new KeyValPair("a", 10.0));
- oper.data.process(new KeyValPair("b", 5.0));
- oper.data.process(new KeyValPair("d", 55.0));
- oper.data.process(new KeyValPair("b", 12.0));
- oper.data.process(new KeyValPair("d", 22.0));
- oper.data.process(new KeyValPair("d", 14.2));
- oper.data.process(new KeyValPair("d", 46.0));
- oper.data.process(new KeyValPair("e", 2.0));
- oper.data.process(new KeyValPair("a", 23.0));
- oper.data.process(new KeyValPair("d", 4.0));
-
- oper.endWindow(); //
-
- // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
- Assert.assertEquals("number emitted tuples", 5, countSink.collectedTuples.size());
- for (Object o : countSink.collectedTuples) {
- KeyValPair<String, Integer> e = (KeyValPair<String, Integer>)o;
- Integer val = (Integer)e.getValue();
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", 4, val.intValue());
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
- } else if (e.getKey().equals("d")) {
- Assert.assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
- } else if (e.getKey().equals("e")) {
- Assert.assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
index 9fe556f..c7e7c65 100644
--- a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
@@ -29,6 +29,7 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
* <p>
*
*/
+
public class DivisionTest
{
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
deleted file mode 100644
index a113d5f..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.ExceptMap}
- */
-public class ExceptMapTest
-{
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new ExceptMap<String, Integer>());
- testNodeProcessingSchema(new ExceptMap<String, Double>());
- testNodeProcessingSchema(new ExceptMap<String, Float>());
- testNodeProcessingSchema(new ExceptMap<String, Short>());
- testNodeProcessingSchema(new ExceptMap<String, Long>());
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void testNodeProcessingSchema(ExceptMap oper)
- {
- CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
- oper.except.setSink(exceptSink);
- oper.setKey("a");
- oper.setValue(3.0);
- oper.setTypeEQ();
-
- oper.beginWindow(0);
- HashMap<String, Number> input = new HashMap<String, Number>();
- input.put("a", 2);
- input.put("b", 20);
- input.put("c", 1000);
- oper.data.process(input);
- input.clear();
- input.put("a", 3);
- oper.data.process(input);
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
- for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple)
- .entrySet()) {
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", new Double(2), e
- .getValue().doubleValue(), 0);
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e
- .getValue().doubleValue(), 0);
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e
- .getValue().doubleValue(), 0);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
index 68e89eb..4eb6a1b 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
@@ -24,8 +24,10 @@ import org.junit.Test;
import com.datatorrent.lib.testbench.SumTestSink;
/**
+ *
* Functional tests for {@link com.datatorrent.lib.math.MultiplyByConstant}
*/
+
public class MultiplyByConstantTest
{
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
deleted file mode 100644
index 92c0e77..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.QuotientMap}
- */
-public class QuotientMapTest
-{
- private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class);
-
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- testNodeProcessingSchema(new QuotientMap<String, Integer>());
- testNodeProcessingSchema(new QuotientMap<String, Double>());
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public void testNodeProcessingSchema(QuotientMap oper) throws Exception
- {
- CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink();
-
- oper.quotient.setSink(quotientSink);
- oper.setMult_by(2);
-
- oper.beginWindow(0); //
- HashMap<String, Number> input = new HashMap<String, Number>();
- int numtuples = 100;
- for (int i = 0; i < numtuples; i++) {
- input.clear();
- input.put("a", 2);
- input.put("b", 20);
- input.put("c", 1000);
- oper.numerator.process(input);
- input.clear();
- input.put("a", 2);
- input.put("b", 40);
- input.put("c", 500);
- oper.denominator.process(input);
- }
-
- oper.endWindow();
-
- // One for each key
- Assert.assertEquals("number emitted tuples", 1, quotientSink.count);
- HashMap<String, Number> output = (HashMap<String, Number>)quotientSink.tuple;
- for (Map.Entry<String, Number> e : output.entrySet()) {
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", 2d,
- e.getValue());
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", 1d,
- e.getValue());
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", 4d,
- e.getValue());
- } else {
- LOG.debug(String.format("key was %s", e.getKey()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
deleted file mode 100644
index 604e45f..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.api.Sink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.Quotient}
- */
-public class QuotientTest
-{
-
- class TestSink implements Sink<Object>
- {
- List<Object> collectedTuples = new ArrayList<Object>();
-
- @Override
- public void put(Object payload)
- {
- collectedTuples.add(payload);
- }
-
- @Override
- public int getCount(boolean reset)
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
- }
-
- /**
- * Test oper logic emits correct results.
- */
- @Test
- public void testNodeSchemaProcessing()
- {
- Quotient<Double> oper = new Quotient<Double>();
- TestSink quotientSink = new TestSink();
- oper.quotient.setSink(quotientSink);
-
- oper.setMult_by(2);
-
- oper.beginWindow(0); //
- Double a = 30.0;
- Double b = 20.0;
- Double c = 100.0;
- oper.denominator.process(a);
- oper.denominator.process(b);
- oper.denominator.process(c);
-
- a = 5.0;
- oper.numerator.process(a);
- a = 1.0;
- oper.numerator.process(a);
- b = 44.0;
- oper.numerator.process(b);
-
- b = 10.0;
- oper.numerator.process(b);
- c = 22.0;
- oper.numerator.process(c);
- c = 18.0;
- oper.numerator.process(c);
-
- a = 0.5;
- oper.numerator.process(a);
- b = 41.5;
- oper.numerator.process(b);
- a = 8.0;
- oper.numerator.process(a);
- oper.endWindow(); //
-
- // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
- Assert.assertEquals("number emitted tuples", 1,
- quotientSink.collectedTuples.size());
- for (Object o : quotientSink.collectedTuples) { // sum is 1157
- Double val = (Double)o;
- Assert.assertEquals("emitted quotient value was ", new Double(2.0), val);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
index e968dba..f74e0c9 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
@@ -26,11 +26,12 @@ import org.junit.Test;
import com.datatorrent.lib.testbench.SumTestSink;
/**
- *
+ *
* Functional tests for {@link com.datatorrent.lib.math.Sigma}
* <p>
*
*/
+
public class SigmaTest
{
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java b/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
deleted file mode 100644
index b0c7b38..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.SumCountMap}.
- */
-public class SumCountMapTest
-{
- /**
- * Test operator logic emits correct results.
- */
- @Test
- public void testNodeProcessing()
- {
- testNodeSchemaProcessing(true, true);
- testNodeSchemaProcessing(true, false);
- testNodeSchemaProcessing(false, true);
- testNodeSchemaProcessing(false, false);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public void testNodeSchemaProcessing(boolean sum, boolean count)
- {
- SumCountMap<String, Double> oper = new SumCountMap<String, Double>();
- oper.setType(Double.class);
- CollectorTestSink sumSink = new CollectorTestSink();
- CollectorTestSink countSink = new CollectorTestSink();
- if (sum) {
- oper.sum.setSink(sumSink);
- }
- if (count) {
- oper.count.setSink(countSink);
- }
-
- oper.beginWindow(0); //
-
- HashMap<String, Double> input = new HashMap<String, Double>();
-
- input.put("a", 2.0);
- input.put("b", 20.0);
- input.put("c", 1000.0);
- oper.data.process(input);
- input.clear();
- input.put("a", 1.0);
- oper.data.process(input);
- input.clear();
- input.put("a", 10.0);
- input.put("b", 5.0);
- oper.data.process(input);
- input.clear();
- input.put("d", 55.0);
- input.put("b", 12.0);
- oper.data.process(input);
- input.clear();
- input.put("d", 22.0);
- oper.data.process(input);
- input.clear();
- input.put("d", 14.2);
- oper.data.process(input);
- input.clear();
-
- // Mix integers and doubles
- HashMap<String, Double> inputi = new HashMap<String, Double>();
- inputi.put("d", 46.0);
- inputi.put("e", 2.0);
- oper.data.process(inputi);
- inputi.clear();
- inputi.put("a", 23.0);
- inputi.put("d", 4.0);
- oper.data.process(inputi);
- inputi.clear();
-
- oper.endWindow(); //
-
- if (sum) {
- // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
- Assert.assertEquals("number emitted tuples", 1, sumSink.collectedTuples.size());
-
- for (Object o : sumSink.collectedTuples) {
- HashMap<String, Object> output = (HashMap<String, Object>)o;
- for (Map.Entry<String, Object> e : output.entrySet()) {
- Double val = (Double)e.getValue();
- if (e.getKey().equals("a")) {
- Assert.assertEquals("emitted value for 'a' was ", new Double(36),
- val);
- } else if (e.getKey().equals("b")) {
- Assert.assertEquals("emitted tuple for 'b' was ", new Double(37),
- val);
- } else if (e.getKey().equals("c")) {
- Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000),
- val);
- } else if (e.getKey().equals("d")) {
- Assert.assertEquals("emitted tuple for 'd' was ",
- new Double(141.2), val);
- } else if (e.getKey().equals("e")) {
- Assert.assertEquals("emitted tuple for 'e' was ", new Double(2),
- val);
- }
- }
- }
- }
- if (count) {
- // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
- Assert.assertEquals("number emitted tuples", 1, countSink.collectedTuples.size());
- for (Object o : countSink.collectedTuples) {
- HashMap<String, Object> output = (HashMap<String, Object>)o;
- for (Map.Entry<String, Object> e : output.entrySet()) {
- Integer val = (Integer)e.getValue();
- if (e.getKey().equals("a")) {
- Assert
- .assertEquals("emitted value for 'a' was ", 4, val.intValue());
- } else if (e.getKey().equals("b")) {
- Assert
- .assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
- } else if (e.getKey().equals("c")) {
- Assert
- .assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
- } else if (e.getKey().equals("d")) {
- Assert
- .assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
- } else if (e.getKey().equals("e")) {
- Assert
- .assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
deleted file mode 100644
index 1f29d1d..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.DeleteOperator}.
- */
-public class DeleteOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- DeleteOperator oper = new DeleteOperator();
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class);
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
deleted file mode 100644
index 728fb96..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class FullOuterJoinOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- oper.setFullJoin(true);
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- // set column join condition
- Condition cond = new JoinColumnEqualCondition("a", "a");
- oper.setJoinCondition(cond);
-
- // add columns
- oper.selectTable1Column(new ColumnIndex("b", null));
- oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 11);
- tuple.put("c", 12);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
deleted file mode 100644
index 714f93b..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.function.SumFunction;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.GroupByOperatorTest}.
- */
-public class GroupByOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlGroupBy()
- {
- // create operator
- GroupByHavingOperator oper = new GroupByHavingOperator();
- oper.addColumnGroupByIndex(new ColumnIndex("b", null));
- try {
- oper.addAggregateIndex(new SumFunction("c", null));
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- return;
- }
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 7);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
deleted file mode 100644
index e11723d..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.condition.HavingCompareValue;
-import com.datatorrent.lib.streamquery.condition.HavingCondition;
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-import com.datatorrent.lib.streamquery.function.SumFunction;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.HavingOperatorTest}.
- */
-public class HavingOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlGroupBy() throws Exception
- {
- // create operator
- GroupByHavingOperator oper = new GroupByHavingOperator();
- oper.addColumnGroupByIndex(new ColumnIndex("b", null));
- FunctionIndex sum = new SumFunction("c", null);
- oper.addAggregateIndex(sum);
-
- // create having condition
- HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0);
- oper.addHavingCondition(having);
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 1);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 2);
- tuple.put("c", 7);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
deleted file mode 100644
index 8a022ee..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }.
- *
- */
-public class InnerJoinOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- InnerJoinOperator oper = new InnerJoinOperator();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- // set column join condition
- Condition cond = new JoinColumnEqualCondition("a", "a");
- oper.setJoinCondition(cond);
-
- // add columns
- oper.selectTable1Column(new ColumnIndex("b", null));
- oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
deleted file mode 100644
index aa25e87..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class LeftOuterJoinOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- // set column join condition
- Condition cond = new JoinColumnEqualCondition("a", "a");
- oper.setJoinCondition(cond);
-
- // add columns
- oper.selectTable1Column(new ColumnIndex("b", null));
- oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 11);
- tuple.put("c", 12);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
deleted file mode 100644
index 2d7ba87..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.OrderByOperatorTest}.
- */
-public class OrderByOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // craete operator
- OrderByOperator oper = new OrderByOperator();
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
- oper.addOrderByRule(new OrderByRule<Integer>("b"));
- oper.setDescending(true);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("c", 2);
- tuple.put("a", 0);
- tuple.put("b", 1);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 6);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 4);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 8);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
deleted file mode 100644
index 3a57427..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class RightOuterJoinOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- OuterJoinOperator oper = new OuterJoinOperator();
- oper.setRighttJoin();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- // set column join condition
- Condition cond = new JoinColumnEqualCondition("a", "a");
- oper.setJoinCondition(cond);
-
- // add columns
- oper.selectTable1Column(new ColumnIndex("b", null));
- oper.selectTable2Column(new ColumnIndex("c", null));
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport1.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport1.process(tuple);
-
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport2.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 11);
- tuple.put("c", 12);
- oper.inport2.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
deleted file mode 100644
index 8e6620e..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.SelectOperatorTest}.
- */
-public class SelectOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- SelectOperator oper = new SelectOperator();
- oper.addIndex(new ColumnIndex("b", null));
- oper.addIndex(new ColumnIndex("c", null));
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
deleted file mode 100644
index c92c6c1..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class SelectTopOperatorTest
-{
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Test
- public void testOperator() throws Exception
- {
- SelectTopOperator oper = new SelectTopOperator();
- oper.setTopValue(2);
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.beginWindow(1);
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
- oper.endWindow();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
deleted file mode 100644
index 42af56b..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class UpdateOperatorTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- UpdateOperator oper = new UpdateOperator();
-
- EqualValueCondition condition = new EqualValueCondition();
- condition.addEqualValue("a", 1);
- oper.setCondition(condition);
- oper.addUpdate("c", 100);
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class);
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
deleted file mode 100644
index b0500eb..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.advanced;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.SelectOperator;
-import com.datatorrent.lib.streamquery.condition.BetweenCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.advanced.BetweenConditionTest}.
- */
-public class BetweenConditionTest
-{
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSqlSelect()
- {
- // create operator
- SelectOperator oper = new SelectOperator();
- oper.addIndex(new ColumnIndex("b", null));
- oper.addIndex(new ColumnIndex("c", null));
-
- BetweenCondition cond = new BetweenCondition("a", 0, 2);
- oper.setCondition(cond);
-
-
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
-
- oper.setup(null);
- oper.beginWindow(1);
-
- HashMap<String, Object> tuple = new HashMap<String, Object>();
- tuple.put("a", 0);
- tuple.put("b", 1);
- tuple.put("c", 2);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 1);
- tuple.put("b", 3);
- tuple.put("c", 4);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 2);
- tuple.put("b", 5);
- tuple.put("c", 6);
- oper.inport.process(tuple);
-
- tuple = new HashMap<String, Object>();
- tuple.put("a", 3);
- tuple.put("b", 7);
- tuple.put("c", 8);
- oper.inport.process(tuple);
-
- oper.endWindow();
- oper.teardown();
-
- LOG.debug("{}", sink.collectedTuples);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(BetweenConditionTest.class);
-}