You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/05/24 21:41:00 UTC

incubator-apex-malhar git commit: APEXMALHAR-2082 Data Filter Operator - Added Data Filter Operator - Marked it as Evolving - Added test cases around variations

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 5a4b59ffa -> 9c11400a2


APEXMALHAR-2082 Data Filter Operator
 - Added Data Filter Operator
 - Marked it as Evolving
 - Added test cases around variations


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9c11400a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9c11400a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9c11400a

Branch: refs/heads/master
Commit: 9c11400a21fe78c1709933293dea9cad1490f887
Parents: 5a4b59f
Author: Pradeep A. Dalvi <pr...@datatorrent.com>
Authored: Wed May 11 21:36:02 2016 +0530
Committer: Pradeep A. Dalvi <gi...@pradeepdalvi.com>
Committed: Tue May 24 12:38:14 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/filter/FilterOperator.java  | 213 +++++++++++++++++++
 .../datatorrent/lib/filter/FilterAppTest.java   | 138 ++++++++++++
 .../com/datatorrent/lib/filter/FilterTest.java  | 188 ++++++++++++++++
 3 files changed, 539 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
new file mode 100644
index 0000000..8d61d00
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.filter;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.expression.Expression;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <b>FilterOperator</b>
+ * Filter Operator filter out tuples based on defined condition
+ *
+ * <b>Parameters</b>
+ * - condition: condition based on expression language
+ *
+ * <b>Input Port</b> takes POJOs as an input
+ *
+ * <b>Output Ports</b>
+ * - truePort emits POJOs meeting the given condition
+ * - falsePort emits POJOs not meeting the given condition
+ * - error port emits any error situation while evaluating expression
+ * 
+ */
+@InterfaceStability.Evolving
+public class FilterOperator extends BaseOperator implements Operator.ActivationListener
+{
+  private String condition;
+  private List<String> expressionFunctions = new LinkedList<>();
+
+  private transient Class<?> inClazz = null;
+  private transient Expression<Boolean> expr = null;
+
+  @AutoMetric
+  private long trueTuples;
+
+  @AutoMetric
+  private long falseTuples;
+
+  @AutoMetric
+  private long errorTuples;
+
+  public final transient DefaultOutputPort<Object> truePort = new DefaultOutputPort<Object>();
+
+  public final transient DefaultOutputPort<Object> falsePort = new DefaultOutputPort<Object>();
+
+  public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>();
+
+  public FilterOperator()
+  {
+    expressionFunctions.add("java.lang.Math.*");
+    expressionFunctions.add("org.apache.commons.lang3.StringUtils.*");
+    expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*");
+    expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*");
+    expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*");
+  }
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object t)
+    {
+      processTuple(t);
+    }
+  };
+
+  @Override
+  public void activate(Context context)
+  {
+    createExpression();
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    errorTuples = trueTuples = falseTuples = 0;
+  }
+
+  /**
+   * createExpression: create an expression from condition of POJO fields
+   * Override this function for custom field expressions
+   */
+  protected void createExpression()
+  {
+    logger.info("Creating an expression for condition {}", condition);
+    expr = PojoUtils.createExpression(inClazz, condition, Boolean.class,
+        expressionFunctions.toArray(new String[expressionFunctions.size()]));
+  }
+
+  /**
+   * evalExpression: Evaluate condition/expression
+   * Override this function for custom condition evaluation
+   */
+  protected Boolean evalExpression(Object t)
+  {
+    return expr.execute(t);
+  }
+
+  /**
+   * handleFilter: emit POJO meeting condition on truePort
+   * and if it did not meet condition then on falsePort
+   */
+  private void processTuple(Object t)
+  {
+    try {
+      if (evalExpression(t)) {
+        truePort.emit(t);
+        trueTuples++;
+      } else {
+        falsePort.emit(t);
+        falseTuples++;
+      }
+    } catch (Exception ex) {
+      logger.error("Error in expression eval: {}", ex.getMessage());
+      logger.debug("Exception: ", ex);
+      error.emit(t);
+      errorTuples++;
+    }
+  }
+
+  /**
+   * Returns condition/expression with which Filtering is done
+   *
+   * @return condition parameter of Filter Operator
+   */
+  public String getCondition()
+  {
+    return condition;
+  }
+
+  /**
+   * Set condition/expression with which Filtering operation would be applied
+   *
+   * @param condition parameter of Filter Operator
+   */
+  public void setCondition(String condition)
+  {
+    logger.info("Changing condition from {} to {}", this.condition, condition);
+    this.condition = condition;
+  }
+
+  /**
+   * Returns the list of expression function which would be made available to
+   * expression to use.
+   *
+   * @return List of functions available in expression.
+   */
+  public List<String> getExpressionFunctions()
+  {
+    return expressionFunctions;
+  }
+
+  /**
+   * Set list of import classes/method should should be made statically available
+   * to expression to use.
+   * For ex. org.apache.apex.test1.Test would mean that "Test" method will be
+   * available in the expression to be used directly.
+   * This is an optional property. See constructor to see defaults that are included.
+   *
+   * @param expressionFunctions List of qualified class/method that needs to be
+   *                            imported to expression.
+   */
+  public void setExpressionFunctions(List<String> expressionFunctions)
+  {
+    this.expressionFunctions = expressionFunctions;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
new file mode 100644
index 0000000..3feb899
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.filter;
+
+import java.util.Random;
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Application Test for Filter Operator.
+ */
+public class FilterAppTest
+{
+  @Test
+  public void testFilterApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000); // runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class Application implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      DummyInputGenerator input = dag.addOperator("Input", new DummyInputGenerator());
+      FilterOperator filter = dag.addOperator("Filter", new FilterOperator());
+
+      filter.setCondition("(({$}.getNum() % 10) == 0)");
+
+      ConsoleOutputOperator trueConsole = dag.addOperator("TrueConsole", new ConsoleOutputOperator());
+      trueConsole.setSilent(true);
+      ConsoleOutputOperator falseConsole = dag.addOperator("FalseConsole", new ConsoleOutputOperator());
+      falseConsole.setSilent(true);
+      ConsoleOutputOperator errorConsole = dag.addOperator("ErrorConsole", new ConsoleOutputOperator());
+      errorConsole.setSilent(true);
+
+      dag.getMeta(filter).getMeta(filter.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, DummyPOJO.class);
+
+      dag.addStream("Connect", input.output, filter.input);
+
+      dag.addStream("ConditionTrue", filter.truePort, trueConsole.input);
+      dag.addStream("ConditionFalse", filter.falsePort, falseConsole.input);
+      dag.addStream("ConditionError", filter.error, errorConsole.input);
+    }
+  }
+
+  public static class DummyPOJO
+  {
+    private int num;
+
+    public DummyPOJO()
+    {
+      //for kryo
+    }
+
+    public DummyPOJO(int num)
+    {
+      this.num = num;
+    }
+
+    public int getNum()
+    {
+      return num;
+    }
+
+    public void setNum(int num)
+    {
+      this.num = num;
+    }
+  }
+
+  public static class DummyInputGenerator implements InputOperator
+  {
+    public final transient DefaultOutputPort<DummyPOJO> output = new DefaultOutputPort<>();
+    Random randomGenerator = new Random();
+
+    @Override
+    public void emitTuples()
+    {
+      output.emit(new DummyPOJO(randomGenerator.nextInt(1000)));
+    }
+
+    @Override
+    public void beginWindow(long l)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
new file mode 100644
index 0000000..ba32942
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.filter;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.testbench.CountTestSink;
+import com.datatorrent.stram.engine.PortContext;
+
+/**
+ * Tests for FilterOperator
+ */
+public class FilterTest
+{
+  public static class DummyPrivatePOJO
+  {
+    private long val;
+
+    public long getVal()
+    {
+      return val;
+    }
+
+    public void setVal(long val)
+    {
+      this.val = val;
+    }
+  }
+
+  public static class DummyPublicPOJO
+  {
+    public long val;
+  }
+
+
+  private static FilterOperator filter;
+  private static DummyPrivatePOJO data;
+  private static DummyPublicPOJO pdata;
+
+  private static CountTestSink<Object> trueSink;
+  private static CountTestSink<Object> falseSink;
+  private static CountTestSink<Object> errorSink;
+
+  public void clearSinks()
+  {
+    trueSink.clear();
+    falseSink.clear();
+    errorSink.clear();
+  }
+
+  public void prepareFilterOperator(Class<?> inClass, String condition)
+  {
+    filter.truePort.setSink(trueSink);
+    filter.falsePort.setSink(falseSink);
+    filter.error.setSink(errorSink);
+
+    filter.setup(null);
+
+    Attribute.AttributeMap in = new Attribute.AttributeMap.DefaultAttributeMap();
+    in.put(Context.PortContext.TUPLE_CLASS, inClass);
+    filter.input.setup(new PortContext(in, null));
+
+    filter.setCondition(condition);
+
+    filter.activate(null);
+  }
+
+  public void clearFilterOperator()
+  {
+    clearSinks();
+
+    filter.deactivate();
+    filter.teardown();
+  }
+
+  @Test
+  public void testFilterPrivate()
+  {
+    prepareFilterOperator(DummyPrivatePOJO.class, "({$}.getVal() == 100)");
+
+    filter.beginWindow(0);
+
+    data.setVal(100);
+    filter.input.put(data);
+    Assert.assertEquals("true condition true tuples", 1, trueSink.getCount());
+    Assert.assertEquals("true condition false tuples", 0, falseSink.getCount());
+    Assert.assertEquals("true condition error tuples", 0, errorSink.getCount());
+
+    filter.endWindow();
+
+    clearSinks();
+
+    /* when condition is not true */
+    filter.beginWindow(1);
+
+    data.setVal(1000);
+    filter.input.put(data);
+    Assert.assertEquals("false condition true tuples", 0, trueSink.getCount());
+    Assert.assertEquals("false condition false tuples", 1, falseSink.getCount());
+    Assert.assertEquals("false condition error tuples", 0, errorSink.getCount());
+
+    filter.endWindow();
+
+    clearFilterOperator();
+  }
+
+  @Test
+  public void testFilterPublic()
+  {
+    prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 100)");
+
+    /* when condition is true */
+    filter.beginWindow(0);
+
+    pdata.val = 100;
+    filter.input.put(pdata);
+    Assert.assertEquals("true condition true tuples", 1, trueSink.getCount());
+    Assert.assertEquals("true condition false tuples", 0, falseSink.getCount());
+    Assert.assertEquals("true condition error tuples", 0, errorSink.getCount());
+
+    filter.endWindow();
+
+    clearSinks();
+
+    /* when condition is not true */
+    filter.beginWindow(1);
+
+    pdata.val = 1000;
+    filter.input.put(pdata);
+    Assert.assertEquals("false condition true tuples", 0, trueSink.getCount());
+    Assert.assertEquals("false condition false tuples", 1, falseSink.getCount());
+    Assert.assertEquals("false condition error tuples", 0, errorSink.getCount());
+
+    filter.endWindow();
+
+    clearFilterOperator();
+  }
+
+  @Test
+  public void testFilterError()
+  {
+    prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)");
+
+    filter.beginWindow(0);
+
+    filter.input.put(data);
+    Assert.assertEquals("error condition true tuples", 0, trueSink.getCount());
+    Assert.assertEquals("error condition false tuples", 0, falseSink.getCount());
+    Assert.assertEquals("error condition error tuples", 1, errorSink.getCount());
+
+    filter.endWindow();
+
+    clearFilterOperator();
+  }
+
+  @BeforeClass
+  public static void setup()
+  {
+    data = new DummyPrivatePOJO();
+    pdata = new DummyPublicPOJO();
+    filter = new FilterOperator();
+
+    trueSink = new CountTestSink<>();
+    falseSink = new CountTestSink<>();
+    errorSink = new CountTestSink<>();
+  }
+}