You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/05/24 21:41:00 UTC
incubator-apex-malhar git commit: APEXMALHAR-2082 Data Filter
Operator - Added Data Filter Operator - Marked it as Evolving - Added test
cases around variations
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 5a4b59ffa -> 9c11400a2
APEXMALHAR-2082 Data Filter Operator
- Added Data Filter Operator
- Marked it as Evolving
- Added test cases around variations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9c11400a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9c11400a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9c11400a
Branch: refs/heads/master
Commit: 9c11400a21fe78c1709933293dea9cad1490f887
Parents: 5a4b59f
Author: Pradeep A. Dalvi <pr...@datatorrent.com>
Authored: Wed May 11 21:36:02 2016 +0530
Committer: Pradeep A. Dalvi <gi...@pradeepdalvi.com>
Committed: Tue May 24 12:38:14 2016 -0700
----------------------------------------------------------------------
.../datatorrent/lib/filter/FilterOperator.java | 213 +++++++++++++++++++
.../datatorrent/lib/filter/FilterAppTest.java | 138 ++++++++++++
.../com/datatorrent/lib/filter/FilterTest.java | 188 ++++++++++++++++
3 files changed, 539 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
new file mode 100644
index 0000000..8d61d00
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.filter;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.expression.Expression;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <b>FilterOperator</b>
+ * Filter Operator filter out tuples based on defined condition
+ *
+ * <b>Parameters</b>
+ * - condition: condition based on expression language
+ *
+ * <b>Input Port</b> takes POJOs as an input
+ *
+ * <b>Output Ports</b>
+ * - truePort emits POJOs meeting the given condition
+ * - falsePort emits POJOs not meeting the given condition
+ * - error port emits any error situation while evaluating expression
+ *
+ */
+@InterfaceStability.Evolving
+public class FilterOperator extends BaseOperator implements Operator.ActivationListener
+{
+ private String condition;
+ private List<String> expressionFunctions = new LinkedList<>();
+
+ private transient Class<?> inClazz = null;
+ private transient Expression<Boolean> expr = null;
+
+ @AutoMetric
+ private long trueTuples;
+
+ @AutoMetric
+ private long falseTuples;
+
+ @AutoMetric
+ private long errorTuples;
+
+ public final transient DefaultOutputPort<Object> truePort = new DefaultOutputPort<Object>();
+
+ public final transient DefaultOutputPort<Object> falsePort = new DefaultOutputPort<Object>();
+
+ public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>();
+
+ public FilterOperator()
+ {
+ expressionFunctions.add("java.lang.Math.*");
+ expressionFunctions.add("org.apache.commons.lang3.StringUtils.*");
+ expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*");
+ expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*");
+ expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*");
+ }
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object t)
+ {
+ processTuple(t);
+ }
+ };
+
+ @Override
+ public void activate(Context context)
+ {
+ createExpression();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ errorTuples = trueTuples = falseTuples = 0;
+ }
+
+ /**
+ * createExpression: create an expression from condition of POJO fields
+ * Override this function for custom field expressions
+ */
+ protected void createExpression()
+ {
+ logger.info("Creating an expression for condition {}", condition);
+ expr = PojoUtils.createExpression(inClazz, condition, Boolean.class,
+ expressionFunctions.toArray(new String[expressionFunctions.size()]));
+ }
+
+ /**
+ * evalExpression: Evaluate condition/expression
+ * Override this function for custom condition evaluation
+ */
+ protected Boolean evalExpression(Object t)
+ {
+ return expr.execute(t);
+ }
+
+ /**
+ * handleFilter: emit POJO meeting condition on truePort
+ * and if it did not meet condition then on falsePort
+ */
+ private void processTuple(Object t)
+ {
+ try {
+ if (evalExpression(t)) {
+ truePort.emit(t);
+ trueTuples++;
+ } else {
+ falsePort.emit(t);
+ falseTuples++;
+ }
+ } catch (Exception ex) {
+ logger.error("Error in expression eval: {}", ex.getMessage());
+ logger.debug("Exception: ", ex);
+ error.emit(t);
+ errorTuples++;
+ }
+ }
+
+ /**
+ * Returns condition/expression with which Filtering is done
+ *
+ * @return condition parameter of Filter Operator
+ */
+ public String getCondition()
+ {
+ return condition;
+ }
+
+ /**
+ * Set condition/expression with which Filtering operation would be applied
+ *
+ * @param condition parameter of Filter Operator
+ */
+ public void setCondition(String condition)
+ {
+ logger.info("Changing condition from {} to {}", this.condition, condition);
+ this.condition = condition;
+ }
+
+ /**
+ * Returns the list of expression function which would be made available to
+ * expression to use.
+ *
+ * @return List of functions available in expression.
+ */
+ public List<String> getExpressionFunctions()
+ {
+ return expressionFunctions;
+ }
+
+ /**
+ * Set list of import classes/method should should be made statically available
+ * to expression to use.
+ * For ex. org.apache.apex.test1.Test would mean that "Test" method will be
+ * available in the expression to be used directly.
+ * This is an optional property. See constructor to see defaults that are included.
+ *
+ * @param expressionFunctions List of qualified class/method that needs to be
+ * imported to expression.
+ */
+ public void setExpressionFunctions(List<String> expressionFunctions)
+ {
+ this.expressionFunctions = expressionFunctions;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
new file mode 100644
index 0000000..3feb899
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.filter;
+
+import java.util.Random;
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Application Test for Filter Operator.
+ */
+public class FilterAppTest
+{
+ @Test
+ public void testFilterApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ public static class Application implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ DummyInputGenerator input = dag.addOperator("Input", new DummyInputGenerator());
+ FilterOperator filter = dag.addOperator("Filter", new FilterOperator());
+
+ filter.setCondition("(({$}.getNum() % 10) == 0)");
+
+ ConsoleOutputOperator trueConsole = dag.addOperator("TrueConsole", new ConsoleOutputOperator());
+ trueConsole.setSilent(true);
+ ConsoleOutputOperator falseConsole = dag.addOperator("FalseConsole", new ConsoleOutputOperator());
+ falseConsole.setSilent(true);
+ ConsoleOutputOperator errorConsole = dag.addOperator("ErrorConsole", new ConsoleOutputOperator());
+ errorConsole.setSilent(true);
+
+ dag.getMeta(filter).getMeta(filter.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, DummyPOJO.class);
+
+ dag.addStream("Connect", input.output, filter.input);
+
+ dag.addStream("ConditionTrue", filter.truePort, trueConsole.input);
+ dag.addStream("ConditionFalse", filter.falsePort, falseConsole.input);
+ dag.addStream("ConditionError", filter.error, errorConsole.input);
+ }
+ }
+
+ public static class DummyPOJO
+ {
+ private int num;
+
+ public DummyPOJO()
+ {
+ //for kryo
+ }
+
+ public DummyPOJO(int num)
+ {
+ this.num = num;
+ }
+
+ public int getNum()
+ {
+ return num;
+ }
+
+ public void setNum(int num)
+ {
+ this.num = num;
+ }
+ }
+
+ public static class DummyInputGenerator implements InputOperator
+ {
+ public final transient DefaultOutputPort<DummyPOJO> output = new DefaultOutputPort<>();
+ Random randomGenerator = new Random();
+
+ @Override
+ public void emitTuples()
+ {
+ output.emit(new DummyPOJO(randomGenerator.nextInt(1000)));
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
new file mode 100644
index 0000000..ba32942
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.filter;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.testbench.CountTestSink;
+import com.datatorrent.stram.engine.PortContext;
+
+/**
+ * Tests for FilterOperator
+ */
+public class FilterTest
+{
+ public static class DummyPrivatePOJO
+ {
+ private long val;
+
+ public long getVal()
+ {
+ return val;
+ }
+
+ public void setVal(long val)
+ {
+ this.val = val;
+ }
+ }
+
+ public static class DummyPublicPOJO
+ {
+ public long val;
+ }
+
+
+ private static FilterOperator filter;
+ private static DummyPrivatePOJO data;
+ private static DummyPublicPOJO pdata;
+
+ private static CountTestSink<Object> trueSink;
+ private static CountTestSink<Object> falseSink;
+ private static CountTestSink<Object> errorSink;
+
+ public void clearSinks()
+ {
+ trueSink.clear();
+ falseSink.clear();
+ errorSink.clear();
+ }
+
+ public void prepareFilterOperator(Class<?> inClass, String condition)
+ {
+ filter.truePort.setSink(trueSink);
+ filter.falsePort.setSink(falseSink);
+ filter.error.setSink(errorSink);
+
+ filter.setup(null);
+
+ Attribute.AttributeMap in = new Attribute.AttributeMap.DefaultAttributeMap();
+ in.put(Context.PortContext.TUPLE_CLASS, inClass);
+ filter.input.setup(new PortContext(in, null));
+
+ filter.setCondition(condition);
+
+ filter.activate(null);
+ }
+
+ public void clearFilterOperator()
+ {
+ clearSinks();
+
+ filter.deactivate();
+ filter.teardown();
+ }
+
+ @Test
+ public void testFilterPrivate()
+ {
+ prepareFilterOperator(DummyPrivatePOJO.class, "({$}.getVal() == 100)");
+
+ filter.beginWindow(0);
+
+ data.setVal(100);
+ filter.input.put(data);
+ Assert.assertEquals("true condition true tuples", 1, trueSink.getCount());
+ Assert.assertEquals("true condition false tuples", 0, falseSink.getCount());
+ Assert.assertEquals("true condition error tuples", 0, errorSink.getCount());
+
+ filter.endWindow();
+
+ clearSinks();
+
+ /* when condition is not true */
+ filter.beginWindow(1);
+
+ data.setVal(1000);
+ filter.input.put(data);
+ Assert.assertEquals("false condition true tuples", 0, trueSink.getCount());
+ Assert.assertEquals("false condition false tuples", 1, falseSink.getCount());
+ Assert.assertEquals("false condition error tuples", 0, errorSink.getCount());
+
+ filter.endWindow();
+
+ clearFilterOperator();
+ }
+
+ @Test
+ public void testFilterPublic()
+ {
+ prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 100)");
+
+ /* when condition is true */
+ filter.beginWindow(0);
+
+ pdata.val = 100;
+ filter.input.put(pdata);
+ Assert.assertEquals("true condition true tuples", 1, trueSink.getCount());
+ Assert.assertEquals("true condition false tuples", 0, falseSink.getCount());
+ Assert.assertEquals("true condition error tuples", 0, errorSink.getCount());
+
+ filter.endWindow();
+
+ clearSinks();
+
+ /* when condition is not true */
+ filter.beginWindow(1);
+
+ pdata.val = 1000;
+ filter.input.put(pdata);
+ Assert.assertEquals("false condition true tuples", 0, trueSink.getCount());
+ Assert.assertEquals("false condition false tuples", 1, falseSink.getCount());
+ Assert.assertEquals("false condition error tuples", 0, errorSink.getCount());
+
+ filter.endWindow();
+
+ clearFilterOperator();
+ }
+
+ @Test
+ public void testFilterError()
+ {
+ prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)");
+
+ filter.beginWindow(0);
+
+ filter.input.put(data);
+ Assert.assertEquals("error condition true tuples", 0, trueSink.getCount());
+ Assert.assertEquals("error condition false tuples", 0, falseSink.getCount());
+ Assert.assertEquals("error condition error tuples", 1, errorSink.getCount());
+
+ filter.endWindow();
+
+ clearFilterOperator();
+ }
+
+ @BeforeClass
+ public static void setup()
+ {
+ data = new DummyPrivatePOJO();
+ pdata = new DummyPublicPOJO();
+ filter = new FilterOperator();
+
+ trueSink = new CountTestSink<>();
+ falseSink = new CountTestSink<>();
+ errorSink = new CountTestSink<>();
+ }
+}