You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2019/11/05 03:48:24 UTC
[calcite] branch master updated: [CALCITE-3400] Implement
left/right/semi/anti/full join in interpreter (Wang Yanlin)
This is an automated email from the ASF dual-hosted git repository.
hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 01043b1 [CALCITE-3400] Implement left/right/semi/anti/full join in interpreter (Wang Yanlin)
01043b1 is described below
commit 01043b1d97fbba4c9d23b45f5aca7a19e1086765
Author: yanlin-Lynn <19...@163.com>
AuthorDate: Fri Oct 4 20:53:28 2019 +0800
[CALCITE-3400] Implement left/right/semi/anti/full join in interpreter (Wang Yanlin)
For anti join, right input of the join may like this,
LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
LogicalProject(X=[$0], $f1=[true])
LogicalValues(tuples=[[{ 1, 'd' }]])
so need to calculate the minimum of boolean values, use MinBoolean as a
user-defined aggregate.
Close #1496
---
.../apache/calcite/interpreter/AggregateNode.java | 26 +++++
.../org/apache/calcite/interpreter/JoinNode.java | 130 ++++++++++++++++++---
.../org/apache/calcite/test/InterpreterTest.java | 73 ++++++++++++
3 files changed, 212 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
index 401edf8..e689be1 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java
@@ -150,6 +150,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
case REAL:
clazz = MinDouble.class;
break;
+ case BOOLEAN:
+ clazz = MinBoolean.class;
+ break;
default:
clazz = MinLong.class;
break;
@@ -536,6 +539,29 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> {
}
}
+ /** Implementation of {@code MIN} function to calculate the minimum of
+ * {@code boolean} values as a user-defined aggregate.
+ */
+ public static class MinBoolean {
+ public MinBoolean() { }
+
+ public Boolean init() {
+ return Boolean.TRUE;
+ }
+
+ public Boolean add(Boolean accumulator, Boolean value) {
+ return accumulator.compareTo(value) < 0 ? accumulator : value;
+ }
+
+ public Boolean merge(Boolean accumulator0, Boolean accumulator1) {
+ return add(accumulator0, accumulator1);
+ }
+
+ public Boolean result(Boolean accumulator) {
+ return accumulator;
+ }
+ }
+
/** Implementation of {@code MAX} function to calculate the minimum of
* {@code integer} values as a user-defined aggregate.
*/
diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
index a167254..c870c90 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java
@@ -17,11 +17,14 @@
package org.apache.calcite.interpreter;
import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Interpreter node that implements a
@@ -47,28 +50,121 @@ public class JoinNode implements Node {
}
public void run() throws InterruptedException {
- List<Row> rightList = null;
- final int leftCount = rel.getLeft().getRowType().getFieldCount();
- final int rightCount = rel.getRight().getRowType().getFieldCount();
- context.values = new Object[rel.getRowType().getFieldCount()];
- Row left;
- Row right;
- while ((left = leftSource.receive()) != null) {
- System.arraycopy(left.getValues(), 0, context.values, 0, leftCount);
- if (rightList == null) {
- rightList = new ArrayList<>();
- while ((right = rightSource.receive()) != null) {
- rightList.add(right);
+
+ final int fieldCount = rel.getLeft().getRowType().getFieldCount()
+ + rel.getRight().getRowType().getFieldCount();
+ context.values = new Object[fieldCount];
+
+ // source for the outer relation of nested loop
+ Source outerSource = leftSource;
+ // source for the inner relation of nested loop
+ Source innerSource = rightSource;
+ if (rel.getJoinType() == JoinRelType.RIGHT) {
+ outerSource = rightSource;
+ innerSource = leftSource;
+ }
+
+ // row from outer source
+ Row outerRow = null;
+ // rows from inner source
+ List<Row> innerRows = null;
+ Set<Row> matchRowSet = new HashSet<>();
+ while ((outerRow = outerSource.receive()) != null) {
+ if (innerRows == null) {
+ innerRows = new ArrayList<Row>();
+ Row innerRow = null;
+ while ((innerRow = innerSource.receive()) != null) {
+ innerRows.add(innerRow);
}
}
- for (Row right2 : rightList) {
- System.arraycopy(right2.getValues(), 0, context.values, leftCount,
- rightCount);
- final Boolean execute = (Boolean) condition.execute(context);
- if (execute != null && execute) {
+ matchRowSet.addAll(doJoin(outerRow, innerRows, rel.getJoinType()));
+ }
+ if (rel.getJoinType() == JoinRelType.FULL) {
+ // send un-match rows for full join on right source
+ List<Row> empty = new ArrayList<>();
+ for (Row row: innerRows) {
+ if (matchRowSet.contains(row)) {
+ continue;
+ }
+ doSend(row, empty, JoinRelType.RIGHT);
+ }
+ }
+ }
+
+ /**
+ * Execution of the join action, returns the matched rows for the outer source row.
+ */
+ private List<Row> doJoin(Row outerRow, List<Row> innerRows,
+ JoinRelType joinRelType) throws InterruptedException {
+ boolean outerRowOnLeft = joinRelType != JoinRelType.RIGHT;
+ copyToContext(outerRow, outerRowOnLeft);
+ List<Row> matchInnerRows = new ArrayList<>();
+ for (Row innerRow: innerRows) {
+ copyToContext(innerRow, !outerRowOnLeft);
+ final Boolean execute = (Boolean) condition.execute(context);
+ if (execute != null && execute) {
+ matchInnerRows.add(innerRow);
+ }
+ }
+ doSend(outerRow, matchInnerRows, joinRelType);
+ return matchInnerRows;
+ }
+
+ /**
+ * If there exists matched rows with the outer row, sends the corresponding joined result,
+ * otherwise, checks if need to use null value for column.
+ */
+ private void doSend(Row outerRow, List<Row> matchInnerRows,
+ JoinRelType joinRelType) throws InterruptedException {
+ if (!matchInnerRows.isEmpty()) {
+ switch (joinRelType) {
+ case INNER:
+ case LEFT:
+ case RIGHT:
+ case FULL:
+ boolean outerRowOnLeft = joinRelType != JoinRelType.RIGHT;
+ copyToContext(outerRow, outerRowOnLeft);
+ for (Row row: matchInnerRows) {
+ copyToContext(row, !outerRowOnLeft);
sink.send(Row.asCopy(context.values));
}
+ break;
+ case SEMI:
+ sink.send(Row.asCopy(outerRow.getValues()));
+ break;
}
+ } else {
+ switch (joinRelType) {
+ case LEFT:
+ case RIGHT:
+ case FULL:
+ int nullColumnNum = context.values.length - outerRow.size();
+ // for full join, use left source as outer source,
+ // and send un-match rows in left source fist,
+ // the un-match rows in right source will be process later.
+ copyToContext(outerRow, joinRelType.generatesNullsOnRight());
+ int nullColumnStart = joinRelType.generatesNullsOnRight() ? outerRow.size() : 0;
+ System.arraycopy(new Object[nullColumnNum], 0,
+ context.values, nullColumnStart, nullColumnNum);
+ sink.send(Row.asCopy(context.values));
+ break;
+ case ANTI:
+ sink.send(Row.asCopy(outerRow.getValues()));
+ break;
+ }
+ }
+ }
+
+ /**
+ * Copies the value of row into context values.
+ */
+ private void copyToContext(Row row, boolean toLeftSide) {
+ Object[] values = row.getValues();
+ if (toLeftSide) {
+ System.arraycopy(values, 0, context.values, 0, values.length);
+ } else {
+ System.arraycopy(values, 0, context.values,
+ context.values.length - values.length, values.length);
}
}
}
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 281717c..1b36dd3 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -20,8 +20,12 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.Interpreter;
import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
@@ -360,6 +364,75 @@ public class InterpreterTest {
sql(sql).returnsRows();
}
+ @Test public void testInterpretInnerJoin() throws Exception {
+ final String sql = "select * from\n"
+ + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+ + "join\n"
+ + "(select x, y from (values (1, 'd'), (2, 'c')) as t2(x, y)) t2\n"
+ + "on t.x = t2.x";
+ sql(sql).returnsRows("[1, a, 1, d]", "[2, b, 2, c]");
+ }
+
+ @Test public void testInterpretLeftOutJoin() throws Exception {
+ final String sql = "select * from\n"
+ + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+ + "left join\n"
+ + "(select x, y from (values (1, 'd')) as t2(x, y)) t2\n"
+ + "on t.x = t2.x";
+ sql(sql).returnsRows("[1, a, 1, d]", "[2, b, null, null]", "[3, c, null, null]");
+ }
+
+ @Test public void testInterpretRightOutJoin() throws Exception {
+ final String sql = "select * from\n"
+ + "(select x, y from (values (1, 'd')) as t2(x, y)) t2\n"
+ + "right join\n"
+ + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+ + "on t2.x = t.x";
+ sql(sql).returnsRows("[1, d, 1, a]", "[null, null, 2, b]", "[null, null, 3, c]");
+ }
+
+ @Test public void testInterpretSemanticSemiJoin() throws Exception {
+ final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+ + "where x in\n"
+ + "(select x from (values (1, 'd'), (3, 'g')) as t2(x, y))";
+ sql(sql).returnsRows("[1, a]", "[3, c]");
+ }
+
+ @Test public void testInterpretSemiJoin() throws Exception {
+ final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+ + "where x in\n"
+ + "(select x from (values (1, 'd'), (3, 'g')) as t2(x, y))";
+ SqlNode validate = planner.validate(planner.parse(sql));
+ RelNode convert = planner.rel(validate).rel;
+ final HepProgram program = new HepProgramBuilder()
+ .addRuleInstance(SemiJoinRule.PROJECT)
+ .build();
+ final HepPlanner hepPlanner = new HepPlanner(program);
+ hepPlanner.setRoot(convert);
+ final RelNode relNode = hepPlanner.findBestExp();
+ final Interpreter interpreter = new Interpreter(dataContext, relNode);
+ assertRows(interpreter, true, "[1, a]", "[3, c]");
+ }
+
+ @Test public void testInterpretAntiJoin() throws Exception {
+ final String sql = "select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)\n"
+ + "where x not in \n"
+ + "(select x from (values (1, 'd')) as t2(x, y))";
+ sql(sql).returnsRows("[2, b]", "[3, c]");
+ }
+
+ @Test public void testInterpretFullJoin() throws Exception {
+ final String sql = "select * from\n"
+ + "(select x, y from (values (1, 'a'), (2, 'b'), (3, 'c')) as t(x, y)) t\n"
+ + "full join\n"
+ + "(select x, y from (values (1, 'd'), (2, 'c'), (4, 'x')) as t2(x, y)) t2\n"
+ + "on t.x = t2.x";
+ sql(sql).returnsRows(
+ "[1, a, 1, d]",
+ "[2, b, 2, c]",
+ "[3, c, null, null]",
+ "[null, null, 4, x]");
+ }
}
// End InterpreterTest.java