You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2015/11/22 06:23:31 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1812 implement anti join
operator
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 228d46cec -> 217f8db47
MLHR-1812 implement anti join operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9dac355a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9dac355a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9dac355a
Branch: refs/heads/devel-3
Commit: 9dac355a4e8eb4185e9d973db11bfbf408a0dc4f
Parents: 5b9eff8
Author: Dongming Liang <do...@capitalone.com>
Authored: Fri Nov 20 16:12:18 2015 -0800
Committer: Dongming Liang <do...@capitalone.com>
Committed: Fri Nov 20 16:12:18 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/join/AntiJoinOperator.java | 203 +++++++++++++++++++
.../lib/join/AntiJoinOperatorTest.java | 119 +++++++++++
2 files changed, 322 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
new file mode 100644
index 0000000..382a0d6
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+
+/**
+ * An implementation of Operator that reads table row data from two table data input ports. <br>
+ * <p>
+ * Operator anti-joins row on given condition and selected names, emits
+ * anti-joined result at output port.
+ * <br>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> inport1 : </b> Input port for table 1, expects HashMap<String, Object> <br>
+ * <b> inport2 : </b> Input port for table 2, expects HashMap<String, Object> <br>
+ * <b> outport : </b> Output anti-joined row port, emits HashMap<String, ArrayList<Object>> <br>
+ * <br>
+ * <b> Properties : </b>
+ * <b> joinCondition : </b> Join condition for table rows. <br>
+ * <b> table1Columns : </b> Columns to be selected from table1. <br>
+ * <b> table2Columns : </b> Columns to be selected from table2. <br>
+ * <br>
+ *
+ * @displayName Anti join
+ * @category Stream Manipulators
+ * @tags sql, anti join operator
+ * @since 0.3.3
+ */
+@OperatorAnnotation(partitionable = false)
+@Evolving
+public class AntiJoinOperator implements Operator
+{
+
+ /**
+ * Join Condition;
+ */
+ private Condition joinCondition;
+
+ /**
+ * Table1 select columns.
+ * Note: only left table (Table1) will be output in an Anti-join
+ */
+ private ArrayList<Index> table1Columns = new ArrayList<>();
+
+ /**
+ * Collect data rows from input port 1.
+ */
+ private List<Map<String, Object>> table1;
+
+ /**
+ * Collect data from input port 2.
+ */
+ private List<Map<String, Object>> table2;
+
+ /**
+ * Input port 1 that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ table1.add(tuple);
+ for (int j = 0; j < table2.size(); j++) {
+ if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, table2.get(j)))) {
+ table1.remove(tuple);
+ }
+ }
+ }
+ };
+
+ /**
+ * Input port 2 that takes a map of <string,object>.
+ */
+ public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ table2.add(tuple);
+
+ for (int j = 0; j < table1.size(); j++) {
+ if ((joinCondition != null)
+ && (joinCondition.isValidJoin(table1.get(j), tuple))) {
+ table1.remove(table1.get(j));
+ }
+ }
+ }
+ };
+
+ /**
+ * Output port that emits a map of <string,object>.
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<>();
+
+ @Override
+ public void setup(OperatorContext arg0)
+ {
+ table1 = new ArrayList<>();
+ table2 = new ArrayList<>();
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long arg0)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ /* All joined rows have been removed
+ * The ones left are the anti-joined result
+ */
+ for (int i = 0; i < table1.size(); i++) {
+ joinRows(table1.get(i));
+ }
+
+ table1.clear();
+ table2.clear();
+ }
+
+ /**
+ * @return the joinCondition
+ */
+ public Condition getJoinCondition()
+ {
+ return joinCondition;
+ }
+
+ /**
+ * Pick the supported condition. Currently only equal join is supported.
+ *
+ * @param joinCondition - join condition
+ */
+ public void setJoinCondition(Condition joinCondition)
+ {
+ this.joinCondition = joinCondition;
+ }
+
+ /**
+ * Select table1 column name.
+ */
+ public void selectTable1Column(Index column)
+ {
+ table1Columns.add(column);
+ }
+
+ /**
+ * Join row from table1 and table2.
+ */
+ protected void joinRows(Map<String, Object> row)
+ {
+ // joined row
+ Map<String, Object> join = new HashMap<>();
+
+ // filter table1 columns
+ if (row != null) {
+ for (Index index : table1Columns) {
+ index.filter(row, join);
+ }
+ }
+
+ // emit row
+ outport.emit(join);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
new file mode 100644
index 0000000..4241a80
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.HashMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ *
+ * Functional test for {@link AntiJoinOperator }.
+ *
+ */
+public class AntiJoinOperatorTest
+{
+ @Test
+ public void testSqlSelect()
+ {
+ // create operator
+ AntiJoinOperator oper = new AntiJoinOperator();
+ CollectorTestSink sink = new CollectorTestSink();
+ TestUtils.setSink(oper.outport, sink);
+
+ // set column join condition
+ Condition cond = new JoinColumnEqualCondition("a", "a");
+ oper.setJoinCondition(cond);
+
+ // add columns
+ oper.selectTable1Column(new ColumnIndex("b", null));
+ oper.selectTable1Column(new ColumnIndex("c", null));
+
+ oper.setup(null);
+ HashMap<String, Object> tuple = new HashMap<>();
+
+ // test 1, positive result
+ oper.beginWindow(1);
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 2);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ oper.endWindow();
+
+ // expected anti-joined result: {b=3, c=4}
+ Assert.assertEquals("number of anti-join result", 1, sink.collectedTuples.size());
+ sink.clear();
+
+ // test 2, negative result (empty result)
+ oper.beginWindow(2);
+ tuple.put("a", 0);
+ tuple.put("b", 1);
+ tuple.put("c", 2);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 1);
+ tuple.put("b", 3);
+ tuple.put("c", 4);
+ oper.inport1.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 0);
+ tuple.put("b", 7);
+ tuple.put("c", 8);
+ oper.inport2.process(tuple);
+
+ tuple = new HashMap<>();
+ tuple.put("a", 1);
+ tuple.put("b", 5);
+ tuple.put("c", 6);
+ oper.inport2.process(tuple);
+
+ oper.endWindow();
+ oper.teardown();
+
+ // expected anti-joined result: []
+ Assert.assertEquals("number of anti-join result (empty)", 0, sink.collectedTuples.size());
+ }
+}
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1812v2'
into devel-3
Posted by cs...@apache.org.
Merge branch 'MLHR-1812v2' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/217f8db4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/217f8db4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/217f8db4
Branch: refs/heads/devel-3
Commit: 217f8db4744c9a32601dc795554f39cae56319d1
Parents: 228d46c 9dac355
Author: Chandni Singh <cs...@apache.org>
Authored: Sat Nov 21 21:21:22 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Sat Nov 21 21:21:22 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/join/AntiJoinOperator.java | 203 +++++++++++++++++++
.../lib/join/AntiJoinOperatorTest.java | 119 +++++++++++
2 files changed, 322 insertions(+)
----------------------------------------------------------------------