You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2015/11/22 06:23:31 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1812 implement anti join operator

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 228d46cec -> 217f8db47


MLHR-1812 implement anti join operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9dac355a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9dac355a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9dac355a

Branch: refs/heads/devel-3
Commit: 9dac355a4e8eb4185e9d973db11bfbf408a0dc4f
Parents: 5b9eff8
Author: Dongming Liang <do...@capitalone.com>
Authored: Fri Nov 20 16:12:18 2015 -0800
Committer: Dongming Liang <do...@capitalone.com>
Committed: Fri Nov 20 16:12:18 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/join/AntiJoinOperator.java  | 203 +++++++++++++++++++
 .../lib/join/AntiJoinOperatorTest.java          | 119 +++++++++++
 2 files changed, 322 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
new file mode 100644
index 0000000..382a0d6
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/AntiJoinOperator.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.index.Index;
+
+
+/**
+ * An implementation of Operator that reads table row data from two table data input ports. <br>
+ * <p>
+ * Operator anti-joins row on given condition and selected names, emits
+ * anti-joined result at output port.
+ * <br>
+ * <b>StateFull : Yes,</b> Operator aggregates input over application window. <br>
+ * <b>Partitions : No, </b> will yield wrong result(s). <br>
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> inport1 : </b> Input port for table 1, expects HashMap&lt;String, Object&gt; <br>
+ * <b> inport2 : </b> Input port for table 2, expects HashMap&lt;String, Object&gt; <br>
+ * <b> outport : </b> Output anti-joined row port, emits HashMap&lt;String, ArrayList&lt;Object&gt;&gt; <br>
+ * <br>
+ * <b> Properties : </b>
+ * <b> joinCondition : </b> Join condition for table rows. <br>
+ * <b> table1Columns : </b> Columns to be selected from table1. <br>
+ * <b> table2Columns : </b> Columns to be selected from table2. <br>
+ * <br>
+ *
+ * @displayName Anti join
+ * @category Stream Manipulators
+ * @tags sql, anti join operator
+ * @since 0.3.3
+ */
+@OperatorAnnotation(partitionable = false)
+@Evolving
+public class AntiJoinOperator implements Operator
+{
+
+  /**
+   * Join Condition;
+   */
+  private Condition joinCondition;
+
+  /**
+   * Table1 select columns.
+   * Note: only left table (Table1) will be output in an Anti-join
+   */
+  private ArrayList<Index> table1Columns = new ArrayList<>();
+
+  /**
+   * Collect data rows from input port 1.
+   */
+  private List<Map<String, Object>> table1;
+
+  /**
+   * Collect data from input port 2.
+   */
+  private List<Map<String, Object>> table2;
+
+  /**
+   * Input port 1 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport1 = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table1.add(tuple);
+      for (int j = 0; j < table2.size(); j++) {
+        if ((joinCondition != null) && (joinCondition.isValidJoin(tuple, table2.get(j)))) {
+          table1.remove(tuple);
+        }
+      }
+    }
+  };
+
+  /**
+   * Input port 2 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table2.add(tuple);
+
+      for (int j = 0; j < table1.size(); j++) {
+        if ((joinCondition != null)
+            && (joinCondition.isValidJoin(table1.get(j), tuple))) {
+          table1.remove(table1.get(j));
+        }
+      }
+    }
+  };
+
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<>();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    table1 = new ArrayList<>();
+    table2 = new ArrayList<>();
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    /* All joined rows have been removed
+     * The ones left are the anti-joined result
+     */
+    for (int i = 0; i < table1.size(); i++) {
+      joinRows(table1.get(i));
+    }
+
+    table1.clear();
+    table2.clear();
+  }
+
+  /**
+   * @return the joinCondition
+   */
+  public Condition getJoinCondition()
+  {
+    return joinCondition;
+  }
+
+  /**
+   * Pick the supported condition. Currently only equal join is supported.
+   *
+   * @param joinCondition - join condition
+   */
+  public void setJoinCondition(Condition joinCondition)
+  {
+    this.joinCondition = joinCondition;
+  }
+
+  /**
+   * Select table1 column name.
+   */
+  public void selectTable1Column(Index column)
+  {
+    table1Columns.add(column);
+  }
+
+  /**
+   * Join row from table1 and table2.
+   */
+  protected void joinRows(Map<String, Object> row)
+  {
+    // joined row
+    Map<String, Object> join = new HashMap<>();
+
+    // filter table1 columns
+    if (row != null) {
+      for (Index index : table1Columns) {
+        index.filter(row, join);
+      }
+    }
+
+    // emit row
+    outport.emit(join);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9dac355a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
new file mode 100644
index 0000000..4241a80
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/AntiJoinOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.HashMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ *
+ * Functional test for {@link AntiJoinOperator }.
+ *
+ */
+public class AntiJoinOperatorTest
+{
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    AntiJoinOperator oper = new AntiJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.outport, sink);
+
+    // set column join condition
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+
+    // add columns
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable1Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    HashMap<String, Object> tuple = new HashMap<>();
+
+    // test 1, positive result
+    oper.beginWindow(1);
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+
+    // expected anti-joined result: {b=3, c=4}
+    Assert.assertEquals("number of anti-join result", 1, sink.collectedTuples.size());
+    sink.clear();
+
+    // test 2, negative result (empty result)
+    oper.beginWindow(2);
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    // expected anti-joined result: []
+    Assert.assertEquals("number of anti-join result (empty)", 0, sink.collectedTuples.size());
+  }
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1812v2' into devel-3

Posted by cs...@apache.org.
Merge branch 'MLHR-1812v2' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/217f8db4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/217f8db4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/217f8db4

Branch: refs/heads/devel-3
Commit: 217f8db4744c9a32601dc795554f39cae56319d1
Parents: 228d46c 9dac355
Author: Chandni Singh <cs...@apache.org>
Authored: Sat Nov 21 21:21:22 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Sat Nov 21 21:21:22 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/join/AntiJoinOperator.java  | 203 +++++++++++++++++++
 .../lib/join/AntiJoinOperatorTest.java          | 119 +++++++++++
 2 files changed, 322 insertions(+)
----------------------------------------------------------------------