You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/07/26 16:16:54 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2149 #resolve #comment Added unit tests for FunctionOperator (Map, FlatMap, Filter)

Repository: apex-malhar
Updated Branches:
  refs/heads/master 2d48548da -> d06b2d987


APEXMALHAR-2149 #resolve #comment Added unit tests for FunctionOperator (Map, FlatMap, Filter)


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b5f9b88b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b5f9b88b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b5f9b88b

Branch: refs/heads/master
Commit: b5f9b88b74145a335de552e0f4f3ea73bfd14ffa
Parents: 2397038
Author: Shunxin <lu...@hotmail.com>
Authored: Mon Jul 25 16:41:34 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Mon Jul 25 16:41:34 2016 -0700

----------------------------------------------------------------------
 .../FunctionOperator/FunctionOperatorTest.java  | 332 +++++++++++++++++++
 1 file changed, 332 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5f9b88b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
new file mode 100644
index 0000000..34820b6
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.stream.FunctionOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Unit tests for FunctionOperator(Map, FlapMap, Filter).
+ */
+
+public class FunctionOperatorTest
+{
+  private static final int NumTuples = 10;
+  private static final int NumFlatMapTuples = 100;
+  private static final int divider = 2;
+  private static final int listSize = 10;
+  private static int TupleCount;
+  private static int sum;
+
+  //Sample operator to generate continuous integers in lists for FlapMap testing.
+  public static class NumberListGenerator extends BaseOperator implements InputOperator
+  {
+    private int numMem;
+    private List<Integer> nums;
+
+    public final transient DefaultOutputPort<List<Integer>> output = new DefaultOutputPort<List<Integer>>();
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      numMem = 0;
+      nums = new ArrayList<Integer>();
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      nums.add(numMem);
+      numMem++;
+      if (numMem < NumFlatMapTuples && nums.size() < listSize) {
+        output.emit(nums);
+        nums.clear();
+      }
+    }
+  }
+
+  //Sample operator to generate continuous integers for filter and map testing.
+  public static class NumberGenerator extends BaseOperator implements InputOperator
+  {
+    private int num;
+
+    public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      num = 0;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      if (num < NumTuples) {
+        output.emit(num);
+        num++;
+      }
+    }
+  }
+
+  public static class ResultCollector extends BaseOperator
+  {
+
+    public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+    {
+      @Override
+      public void process(Integer in)
+      {
+        TupleCount++;
+        sum += in;
+      }
+    };
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      TupleCount = 0;
+      sum = 0;
+    }
+
+  }
+
+
+  public static class FmFunction implements Function.FlatMapFunction<List<Integer>, Integer>
+  {
+    @Override
+    public Iterable<Integer> f(List<Integer> input)
+    {
+      ArrayList<Integer> result = new ArrayList<Integer>();
+      for (int in : input) {
+        if (in % 13 == 0 || in % 17 == 0) {
+          result.add(in * in);
+        }
+      }
+      return result;
+    }
+  }
+
+  public static class Square implements Function.MapFunction<Integer, Integer>
+  {
+    @Override
+    public Integer f(Integer input)
+    {
+      return input * input;
+    }
+  }
+
+
+  @Test
+  public void testMapOperator() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
+    FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
+        = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
+    ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+    dag.addStream("raw numbers", numGen.output, mapper.input);
+    dag.addStream("mapped results", mapper.output, collector.input);
+
+    // Create local cluster
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+  
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TupleCount == NumTuples;
+      }
+    });
+    
+    lc.run(5000);
+    
+    Assert.assertEquals(sum, 285);
+  }
+
+  @Test
+  public void testMapOperatorStream() throws Exception
+  {
+    NumberGenerator numGen = new NumberGenerator();
+    ResultCollector collector = new ResultCollector();
+
+    ApexStream<Integer> nums = StreamFactory.fromInput(numGen, numGen.output)
+        .map(new Square());
+
+    nums.addOperator(collector, collector.input, null)
+        .runEmbedded(false, 10000, new Callable<Boolean>()
+        {
+          @Override
+          public Boolean call() throws Exception
+          {
+            return TupleCount == NumTuples;
+          }
+        });
+
+    Assert.assertEquals(sum, 285);
+  }
+
+
+  @Test
+  public void testFlatMapOperator() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    NumberListGenerator numGen = dag.addOperator("numGen", new NumberListGenerator());
+    FunctionOperator.FlatMapFunctionOperator<List<Integer>, Integer> fm
+        = dag.addOperator("flatmap", new FunctionOperator.FlatMapFunctionOperator<>(new FmFunction()));
+    ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+    dag.addStream("raw numbers", numGen.output, fm.input);
+    dag.addStream("flatmap results", fm.output, collector.input);
+
+    // Create local cluster
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+  
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TupleCount == 13;
+      }
+    });
+
+    lc.run(5000);
+    
+    Assert.assertEquals(sum, 39555);
+  }
+
+  @Test
+  public void testFlatMapOperatorStream() throws Exception
+  {
+    NumberListGenerator numGen = new NumberListGenerator();
+    ResultCollector collector = new ResultCollector();
+
+    ApexStream<Integer> numLists = StreamFactory.fromInput(numGen, numGen.output)
+        .flatMap(new FmFunction());
+    numLists.addOperator(collector, collector.input, null)
+        .runEmbedded(false, 10000, new Callable<Boolean>()
+        {
+          @Override
+          public Boolean call() throws Exception
+          {
+            return TupleCount == 13;
+          }
+        });
+    Assert.assertEquals(sum, 39555);
+  }
+
+  @Test
+  public void testFilterOperator() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    FunctionOperator.FilterFunctionOperator<Integer> filter0
+        = new FunctionOperator.FilterFunctionOperator<Integer>(new Function.FilterFunction<Integer>()
+        {
+          @Override
+          public Boolean f(Integer in)
+          {
+            return in % divider == 0;
+          }
+        });
+
+    NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
+    FunctionOperator.FilterFunctionOperator<Integer> filter = dag.addOperator("filter", filter0);
+    ResultCollector collector = dag.addOperator("collector", new ResultCollector());
+
+    dag.addStream("raw numbers", numGen.output, filter.input);
+    dag.addStream("filtered results", filter.output, collector.input);
+
+    // Create local cluster
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+  
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TupleCount == NumTuples / divider;
+      }
+    });
+
+    lc.run(5000);
+    Assert.assertEquals(sum, 20);
+  }
+
+  @Test
+  public void testFilterOperatorStream() throws Exception
+  {
+    NumberGenerator numGen = new NumberGenerator();
+    ResultCollector collector = new ResultCollector();
+
+    ApexStream<Integer> nums = StreamFactory.fromInput(numGen, numGen.output)
+        .filter(new Function.FilterFunction<Integer>()
+        {
+          @Override
+          public Boolean f(Integer in)
+          {
+            return in % divider == 0;
+          }
+        });
+
+    nums.addOperator(collector, collector.input, null)
+        .runEmbedded(false, 10000, new Callable<Boolean>()
+        {
+          @Override
+          public Boolean call() throws Exception
+          {
+            return TupleCount == NumTuples / divider;
+          }
+        });
+
+    Assert.assertEquals(sum, 20);
+  }
+
+
+}


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2149' of github.com:ShunxinLu/apex-malhar

Posted by hs...@apache.org.
Merge branch 'APEXMALHAR-2149' of github.com:ShunxinLu/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d06b2d98
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d06b2d98
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d06b2d98

Branch: refs/heads/master
Commit: d06b2d9871830be680dfb7ce2a2deef08ae0b815
Parents: 2d48548 b5f9b88
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jul 26 09:16:09 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Jul 26 09:16:09 2016 -0700

----------------------------------------------------------------------
 .../FunctionOperator/FunctionOperatorTest.java  | 332 +++++++++++++++++++
 1 file changed, 332 insertions(+)
----------------------------------------------------------------------