You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/26 16:16:54 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2149 #resolve #comment Added
unit tests for FunctionOperator (Map, FlatMap, Filter)
Repository: apex-malhar
Updated Branches:
refs/heads/master 2d48548da -> d06b2d987
APEXMALHAR-2149 #resolve #comment Added unit tests for FunctionOperator (Map, FlatMap, Filter)
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b5f9b88b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b5f9b88b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b5f9b88b
Branch: refs/heads/master
Commit: b5f9b88b74145a335de552e0f4f3ea73bfd14ffa
Parents: 2397038
Author: Shunxin <lu...@hotmail.com>
Authored: Mon Jul 25 16:41:34 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Mon Jul 25 16:41:34 2016 -0700
----------------------------------------------------------------------
.../FunctionOperator/FunctionOperatorTest.java | 332 +++++++++++++++++++
1 file changed, 332 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5f9b88b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
new file mode 100644
index 0000000..34820b6
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.stream.FunctionOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Unit tests for FunctionOperator(Map, FlapMap, Filter).
+ */
+
+public class FunctionOperatorTest
+{
+ private static final int NumTuples = 10;
+ private static final int NumFlatMapTuples = 100;
+ private static final int divider = 2;
+ private static final int listSize = 10;
+ private static int TupleCount;
+ private static int sum;
+
+ //Sample operator to generate continuous integers in lists for FlapMap testing.
+ public static class NumberListGenerator extends BaseOperator implements InputOperator
+ {
+ private int numMem;
+ private List<Integer> nums;
+
+ public final transient DefaultOutputPort<List<Integer>> output = new DefaultOutputPort<List<Integer>>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ numMem = 0;
+ nums = new ArrayList<Integer>();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ nums.add(numMem);
+ numMem++;
+ if (numMem < NumFlatMapTuples && nums.size() < listSize) {
+ output.emit(nums);
+ nums.clear();
+ }
+ }
+ }
+
+ //Sample operator to generate continuous integers for filter and map testing.
+ public static class NumberGenerator extends BaseOperator implements InputOperator
+ {
+ private int num;
+
+ public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ num = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (num < NumTuples) {
+ output.emit(num);
+ num++;
+ }
+ }
+ }
+
+ public static class ResultCollector extends BaseOperator
+ {
+
+ public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer in)
+ {
+ TupleCount++;
+ sum += in;
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ TupleCount = 0;
+ sum = 0;
+ }
+
+ }
+
+
+ public static class FmFunction implements Function.FlatMapFunction<List<Integer>, Integer>
+ {
+ @Override
+ public Iterable<Integer> f(List<Integer> input)
+ {
+ ArrayList<Integer> result = new ArrayList<Integer>();
+ for (int in : input) {
+ if (in % 13 == 0 || in % 17 == 0) {
+ result.add(in * in);
+ }
+ }
+ return result;
+ }
+ }
+
+ public static class Square implements Function.MapFunction<Integer, Integer>
+ {
+ @Override
+ public Integer f(Integer input)
+ {
+ return input * input;
+ }
+ }
+
+
+ @Test
+ public void testMapOperator() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
+ FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
+ = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
+ ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+ dag.addStream("raw numbers", numGen.output, mapper.input);
+ dag.addStream("mapped results", mapper.output, collector.input);
+
+ // Create local cluster
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == NumTuples;
+ }
+ });
+
+ lc.run(5000);
+
+ Assert.assertEquals(sum, 285);
+ }
+
+ @Test
+ public void testMapOperatorStream() throws Exception
+ {
+ NumberGenerator numGen = new NumberGenerator();
+ ResultCollector collector = new ResultCollector();
+
+ ApexStream<Integer> nums = StreamFactory.fromInput(numGen, numGen.output)
+ .map(new Square());
+
+ nums.addOperator(collector, collector.input, null)
+ .runEmbedded(false, 10000, new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == NumTuples;
+ }
+ });
+
+ Assert.assertEquals(sum, 285);
+ }
+
+
+ @Test
+ public void testFlatMapOperator() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ NumberListGenerator numGen = dag.addOperator("numGen", new NumberListGenerator());
+ FunctionOperator.FlatMapFunctionOperator<List<Integer>, Integer> fm
+ = dag.addOperator("flatmap", new FunctionOperator.FlatMapFunctionOperator<>(new FmFunction()));
+ ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+ dag.addStream("raw numbers", numGen.output, fm.input);
+ dag.addStream("flatmap results", fm.output, collector.input);
+
+ // Create local cluster
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == 13;
+ }
+ });
+
+ lc.run(5000);
+
+ Assert.assertEquals(sum, 39555);
+ }
+
+ @Test
+ public void testFlatMapOperatorStream() throws Exception
+ {
+ NumberListGenerator numGen = new NumberListGenerator();
+ ResultCollector collector = new ResultCollector();
+
+ ApexStream<Integer> numLists = StreamFactory.fromInput(numGen, numGen.output)
+ .flatMap(new FmFunction());
+ numLists.addOperator(collector, collector.input, null)
+ .runEmbedded(false, 10000, new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == 13;
+ }
+ });
+ Assert.assertEquals(sum, 39555);
+ }
+
+ @Test
+ public void testFilterOperator() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ FunctionOperator.FilterFunctionOperator<Integer> filter0
+ = new FunctionOperator.FilterFunctionOperator<Integer>(new Function.FilterFunction<Integer>()
+ {
+ @Override
+ public Boolean f(Integer in)
+ {
+ return in % divider == 0;
+ }
+ });
+
+ NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
+ FunctionOperator.FilterFunctionOperator<Integer> filter = dag.addOperator("filter", filter0);
+ ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+ dag.addStream("raw numbers", numGen.output, filter.input);
+ dag.addStream("filtered results", filter.output, collector.input);
+
+ // Create local cluster
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == NumTuples / divider;
+ }
+ });
+
+ lc.run(5000);
+ Assert.assertEquals(sum, 20);
+ }
+
+ @Test
+ public void testFilterOperatorStream() throws Exception
+ {
+ NumberGenerator numGen = new NumberGenerator();
+ ResultCollector collector = new ResultCollector();
+
+ ApexStream<Integer> nums = StreamFactory.fromInput(numGen, numGen.output)
+ .filter(new Function.FilterFunction<Integer>()
+ {
+ @Override
+ public Boolean f(Integer in)
+ {
+ return in % divider == 0;
+ }
+ });
+
+ nums.addOperator(collector, collector.input, null)
+ .runEmbedded(false, 10000, new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TupleCount == NumTuples / divider;
+ }
+ });
+
+ Assert.assertEquals(sum, 20);
+ }
+
+
+}
[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2149' of
github.com:ShunxinLu/apex-malhar
Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2149' of github.com:ShunxinLu/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d06b2d98
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d06b2d98
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d06b2d98
Branch: refs/heads/master
Commit: d06b2d9871830be680dfb7ce2a2deef08ae0b815
Parents: 2d48548 b5f9b88
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jul 26 09:16:09 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Jul 26 09:16:09 2016 -0700
----------------------------------------------------------------------
.../FunctionOperator/FunctionOperatorTest.java | 332 +++++++++++++++++++
1 file changed, 332 insertions(+)
----------------------------------------------------------------------