You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2019/11/05 03:48:24 UTC

[calcite] branch master updated: [CALCITE-3400] Implement left/right/semi/anti/full join in interpreter (Wang Yanlin)

This is an automated email from the ASF dual-hosted git repository.

hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 01043b1  [CALCITE-3400] Implement left/right/semi/anti/full join in interpreter (Wang Yanlin)
01043b1 is described below

commit 01043b1d97fbba4c9d23b45f5aca7a19e1086765
Author: yanlin-Lynn <19...@163.com>
AuthorDate: Fri Oct 4 20:53:28 2019 +0800

    [CALCITE-3400] Implement left/right/semi/anti/full join in interpreter (Wang Yanlin)
    
    For anti join, right input of the join may like this,
    LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
       LogicalProject(X=[$0], $f1=[true])
          LogicalValues(tuples=[[{ 1, 'd' }]])
    so need to calculate the minimum of boolean values, use MinBoolean as a
    user-defined aggregate.
    
    Close #1496
---
 .../apache/calcite/interpreter/AggregateNode.java  |  26 +++++
 .../org/apache/calcite/interpreter/JoinNode.java   | 130 ++++++++++++++++++---
 .../org/apache/calcite/test/InterpreterTest.java   |  73 ++++++++++++
 3 files changed, 212 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 401edf8..e689be1 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -150,6 +150,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
       case REAL:
         clazz = MinDouble.class;
         break;
+      case BOOLEAN:
+        clazz = MinBoolean.class;
+        break;
       default:
         clazz = MinLong.class;
         break;
@@ -536,6 +539,29 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
     }
   }
 
+  /** Implementation of {@code MIN} function to calculate the minimum of
+   * {@code boolean} values as a user-defined aggregate.
+   */
+  public static class MinBoolean {
+    public MinBoolean() { }
+
+    public Boolean init() {
+      return Boolean.TRUE;
+    }
+
+    public Boolean add(Boolean accumulator, Boolean value) {
+      return accumulator.compareTo(value) < 0 ? accumulator : value;
+    }
+
+    public Boolean merge(Boolean accumulator0, Boolean accumulator1) {
+      return add(accumulator0, accumulator1);
+    }
+
+    public Boolean result(Boolean accumulator) {
+      return accumulator;
+    }
+  }
+
   /** Implementation of {@code MAX} function to calculate the minimum of
    * {@code integer} values as a user-defined aggregate.
    */
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
index a167254..c870c90 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -17,11 +17,14 @@
 package org.apache.calcite.interpreter;
 
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
 
 import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Interpreter node that implements a
@@ -47,28 +50,121 @@ public class JoinNode implements Node {
   }
 
   public void run() throws InterruptedException {
-    List<Row> rightList = null;
-    final int leftCount = rel.getLeft().getRowType().getFieldCount();
-    final int rightCount = rel.getRight().getRowType().getFieldCount();
-    context.values = new Object[rel.getRowType().getFieldCount()];
-    Row left;
-    Row right;
-    while ((left = leftSource.receive()) != null) {
-      System.arraycopy(left.getValues(), 0, context.values, 0, leftCount);
-      if (rightList == null) {
-        rightList = new ArrayList<>();
-        while ((right = rightSource.receive()) != null) {
-          rightList.add(right);
+
+    final int fieldCount = rel.getLeft().getRowType().getFieldCount()
+        + rel.getRight().getRowType().getFieldCount();
+    context.values = new Object[fieldCount];
+
+    // source for the outer relation of nested loop
+    Source outerSource = leftSource;
+    // source for the inner relation of nested loop
+    Source innerSource = rightSource;
+    if (rel.getJoinType() == JoinRelType.RIGHT) {
+      outerSource = rightSource;
+      innerSource = leftSource;
+    }
+
+    // row from outer source
+    Row outerRow = null;
+    // rows from inner source
+    List<Row> innerRows = null;
+    Set<Row> matchRowSet = new HashSet<>();
+    while ((outerRow = outerSource.receive()) != null) {
+      if (innerRows == null) {
+        innerRows = new ArrayList<Row>();
+        Row innerRow = null;
+        while ((innerRow = innerSource.receive()) != null) {
+          innerRows.add(innerRow);
         }
       }
-      for (Row right2 : rightList) {
-        System.arraycopy(right2.getValues(), 0, context.values, leftCount,
-            rightCount);
-        final Boolean execute = (Boolean) condition.execute(context);
-        if (execute != null && execute) {
+      matchRowSet.addAll(doJoin(outerRow, innerRows, rel.getJoinType()));
+    }
+    if (rel.getJoinType() == JoinRelType.FULL) {
+      // send un-match rows for full join on right source
+      List<Row> empty = new ArrayList<>();
+      for (Row row: innerRows) {
+        if (matchRowSet.contains(row)) {
+          continue;
+        }
+        doSend(row, empty, JoinRelType.RIGHT);
+      }
+    }
+  }
+
+  /**
+   * Execution of the join action, returns the matched rows for the outer source row.
+   */
+  private List<Row> doJoin(Row outerRow, List<Row> innerRows,
+      JoinRelType joinRelType) throws InterruptedException {
+    boolean outerRowOnLeft = joinRelType != JoinRelType.RIGHT;
+    copyToContext(outerRow, outerRowOnLeft);
+    List<Row> matchInnerRows = new ArrayList<>();
+    for (Row innerRow: innerRows) {
+      copyToContext(innerRow, !outerRowOnLeft);
+      final Boolean execute = (Boolean) condition.execute(context);
+      if (execute != null && execute) {
+        matchInnerRows.add(innerRow);
+      }
+    }
+    doSend(outerRow, matchInnerRows, joinRelType);
+    return matchInnerRows;
+  }
+
+  /**
+   * If there exists matched rows with the outer row, sends the corresponding joined result,
+   * otherwise, checks if need to use null value for column.
+   */
+  private void doSend(Row outerRow, List<Row> matchInnerRows,
+      JoinRelType joinRelType) throws InterruptedException {
+    if (!matchInnerRows.isEmpty()) {
+      switch (joinRelType) {
+      case INNER:
+      case LEFT:
+      case RIGHT:
+      case FULL:
+        boolean outerRowOnLeft = joinRelType != JoinRelType.RIGHT;
+        copyToContext(outerRow, outerRowOnLeft);
+        for (Row row: matchInnerRows) {
+          copyToContext(row, !outerRowOnLeft);
           sink.send(Row.asCopy(context.values));
         }
+        break;
+      case SEMI:
+        sink.send(Row.asCopy(outerRow.getValues()));
+        break;
       }
+    } else {
+      switch (joinRelType) {
+      case LEFT:
+      case RIGHT:
+      case FULL:
+        int nullColumnNum = context.values.length - outerRow.size();
+        // for full join, use left source as outer source,
+        // and send un-match rows in left source fist,
+        // the un-match rows in right source will be process later.
+        copyToContext(outerRow, joinRelType.generatesNullsOnRight());
+        int nullColumnStart = joinRelType.generatesNullsOnRight() ? outerRow.size() : 0;
+        System.arraycopy(new Object[nullColumnNum], 0,
+            context.values, nullColumnStart, nullColumnNum);
+        sink.send(Row.asCopy(context.values));
+        break;
+      case ANTI:
+        sink.send(Row.asCopy(outerRow.getValues()));
+        break;
+      }
+    }
+  }
+
+  /**
+   * Copies the value of row into context values.
+   */
+  private void copyToContext(Row row, boolean toLeftSide) {
+    Object[] values = row.getValues();
+    if (toLeftSide) {
+      System.arraycopy(values, 0, context.values, 0, values.length);
+    } else {
+      System.arraycopy(values, 0, context.values,
+          context.values.length - values.length, values.length);
     }
   }
 }
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 281717c..1b36dd3 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -20,8 +20,12 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.interpreter.Interpreter;
 import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.rules.SemiJoinRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -360,6 +364,75 @@ public class InterpreterTest {
     sql(sql).returnsRows();
   }
 
+  @Test public void testInterpretInnerJoin() throws Exception {
+    final String sql = "select * from\n"
+        + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+        + "join\n"
+        + "(select x, y from (values (1, 'd'), (2, 'c')) as t2(x, y)) t2\n"
+        + "on t.x = t2.x";
+    sql(sql).returnsRows("[1, a, 1, d]", "[2, b, 2, c]");
+  }
+
+  @Test public void testInterpretLeftOutJoin() throws Exception {
+    final String sql = "select * from\n"
+        + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+        + "left join\n"
+        + "(select x, y from (values (1, 'd')) as t2(x, y)) t2\n"
+        + "on t.x = t2.x";
+    sql(sql).returnsRows("[1, a, 1, d]", "[2, b, null, null]", "[3, c, null, null]");
+  }
+
+  @Test public void testInterpretRightOutJoin() throws Exception {
+    final String sql = "select * from\n"
+        + "(select x, y from (values (1, 'd')) as t2(x, y)) t2\n"
+        + "right join\n"
+        + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+        + "on t2.x = t.x";
+    sql(sql).returnsRows("[1, d, 1, a]", "[null, null, 2, b]", "[null, null, 3, c]");
+  }
+
+  @Test public void testInterpretSemanticSemiJoin() throws Exception {
+    final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+        + "where x in\n"
+        + "(select x from (values (1, 'd'), (3, 'g')) as t2(x, y))";
+    sql(sql).returnsRows("[1, a]", "[3, c]");
+  }
+
+  @Test public void testInterpretSemiJoin() throws Exception {
+    final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+        + "where x in\n"
+        + "(select x from (values (1, 'd'), (3, 'g')) as t2(x, y))";
+    SqlNode validate = planner.validate(planner.parse(sql));
+    RelNode convert = planner.rel(validate).rel;
+    final HepProgram program = new HepProgramBuilder()
+        .addRuleInstance(SemiJoinRule.PROJECT)
+        .build();
+    final HepPlanner hepPlanner = new HepPlanner(program);
+    hepPlanner.setRoot(convert);
+    final RelNode relNode = hepPlanner.findBestExp();
+    final Interpreter interpreter = new Interpreter(dataContext, relNode);
+    assertRows(interpreter, true, "[1, a]", "[3, c]");
+  }
+
+  @Test public void testInterpretAntiJoin() throws Exception {
+    final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+        + "where x not in \n"
+        + "(select x from (values (1, 'd')) as t2(x, y))";
+    sql(sql).returnsRows("[2, b]", "[3, c]");
+  }
+
+  @Test public void testInterpretFullJoin() throws Exception {
+    final String sql = "select * from\n"
+        + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+        + "full join\n"
+        + "(select x, y from (values (1, 'd'), (2, 'c'), (4, 'x')) as t2(x, y)) t2\n"
+        + "on t.x = t2.x";
+    sql(sql).returnsRows(
+        "[1, a, 1, d]",
+        "[2, b, 2, c]",
+        "[3, c, null, null]",
+        "[null, null, 4, x]");
+  }
 }
 
 // End InterpreterTest.java