You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:33:59 UTC

[GitHub] [ignite] rkondakov commented on a change in pull request #7915: IGNITE-12868 Calcite integration. LEFT, RIGHT join support

rkondakov commented on a change in pull request #7915:
URL: https://github.com/apache/ignite/pull/7915#discussion_r437335646



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
##########
@@ -151,6 +152,285 @@ public void testUnionAll() {
         assertEquals(12, res.size());
     }
 
+    /** */
+    @Test
+    public void testLeftJoin() {
+        //    select e.id, e.name, d.name as dep_name
+        //      from emp e
+        // left join dep d
+        //        on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}
+        ));
+
+        LeftJoinNode<Object[]> join = new LeftJoinNode<>(
+            ctx,
+            r -> r[2] == r[3],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(4, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+    }
+
+    /** */
+    @Test
+    public void testRightJoin() {
+        //     select e.id, e.name, d.name as dep_name
+        //       from dep d
+        // right join emp e
+        //         on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}
+        ));
+
+        RightJoinNode<Object[]> join = new RightJoinNode<>(
+            ctx,
+            r -> r[0] == r[4],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(deps, persons));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(4, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+    }
+
+    /** */
+    @Test
+    public void testFullOuterJoin() {
+        //          select e.id, e.name, d.name as dep_name
+        //            from emp e
+        // full outer join dep d
+        //              on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"},
+            new Object[]{3, "QA"}
+        ));
+
+        FullOuterJoinNode<Object[]> join = new FullOuterJoinNode<>(
+            ctx,
+            r -> r[2] == r[3],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[3];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            },
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(5, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+        Assert.assertArrayEquals(new Object[]{null, null, "QA"}, rows.get(4));
+    }
+
+    /** */
+    @Test
+    public void testSemiJoin() {
+        //    select d.name as dep_name
+        //      from dep d
+        // semi join emp e
+        //        on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}

Review comment:
       Can we add `new Object[]{3, "QA"}` here to ensure it will not be a part of the output?

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AntiJoinNode.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class AntiJoinNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final Predicate<Row> cond;
+
+    /** */
+    private final RowHandler<Row> handler;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waitingLeft;
+
+    /** */
+    private int waitingRight;
+
+    /** */
+    private final List<Row> rightMaterialized = new ArrayList<>(IN_BUFFER_SIZE);
+
+    /** */
+    private final Deque<Row> leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    private Row left;
+
+    /** */
+    private int rightIdx;
+
+    /**
+     * @param ctx Execution context.
+     * @param cond Join expression.
+     */
+    public AntiJoinNode(ExecutionContext<Row> ctx, Predicate<Row> cond) {
+        super(ctx);
+
+        this.cond = cond;
+        handler = ctx.rowHandler();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 2;
+        assert rowsCnt > 0 && requested == 0;
+
+        requested = rowsCnt;
+
+        if (!inLoop)
+            context().execute(this::flushFromBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx == 0)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushLeft(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endLeft();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    AntiJoinNode.this.onError(e);
+                }
+            };
+        else if (idx == 1)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushRight(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endRight();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    AntiJoinNode.this.onError(e);
+                }
+            };
+
+        throw new IndexOutOfBoundsException();
+    }
+
+    /** */
+    private void pushLeft(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft--;
+
+        leftInBuf.add(row);
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void pushRight(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight--;
+
+        rightMaterialized.add(row);
+
+        if (waitingRight == 0)
+            sources.get(1).request(waitingRight = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    private void endLeft() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void endRight() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        inLoop = true;
+        try {
+            if (waitingRight == -1) {

Review comment:
       What does `waitingRight == -1` mean? It is not clear. If it is a special value, we can put it into a static constant with a meaningful name. It would improve readability.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FullOuterJoinNode.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class FullOuterJoinNode<Row> extends AbstractNode<Row> {
+    /** Left row factory. */
+    private final RowHandler.RowFactory<Row> leftRowFactory;
+
+    /** Right row factory. */
+    private final RowHandler.RowFactory<Row> rightRowFactory;
+
+    /** Whether current left row was matched or not. */
+    private boolean leftMatched;
+
+    /** */
+    private final Set<Row> rightNotMatched = new HashSet<>();

Review comment:
       Using `HashSet` here looks like not a good idea because in general `Row` has not overridden hc/e methods. Especially current row implementation is a simple `Object[]`. I agree that we need a semantics of set here, but we need to think of its implementation more thoroughly. 

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SemiJoinNode.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class SemiJoinNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final Predicate<Row> cond;
+
+    /** */
+    private final RowHandler<Row> handler;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waitingLeft;
+
+    /** */
+    private int waitingRight;
+
+    /** */
+    private final List<Row> rightMaterialized = new ArrayList<>(IN_BUFFER_SIZE);
+
+    /** */
+    private final Deque<Row> leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    private Row left;
+
+    /** */
+    private int rightIdx;
+
+    /**
+     * @param ctx Execution context.
+     * @param cond Join expression.
+     */
+    public SemiJoinNode(ExecutionContext<Row> ctx, Predicate<Row> cond) {
+        super(ctx);
+
+        this.cond = cond;
+        handler = ctx.rowHandler();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 2;
+        assert rowsCnt > 0 && requested == 0;
+
+        requested = rowsCnt;
+
+        if (!inLoop)
+            context().execute(this::flushFromBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx == 0)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushLeft(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endLeft();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    SemiJoinNode.this.onError(e);
+                }
+            };
+        else if (idx == 1)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushRight(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endRight();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    SemiJoinNode.this.onError(e);
+                }
+            };
+
+        throw new IndexOutOfBoundsException();
+    }
+
+    /** */
+    private void pushLeft(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft--;
+
+        leftInBuf.add(row);
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void pushRight(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight--;
+
+        rightMaterialized.add(row);
+
+        if (waitingRight == 0)
+            sources.get(1).request(waitingRight = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    private void endLeft() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void endRight() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        inLoop = true;
+        try {
+            if (waitingRight == -1) {
+                while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
+                    if (left == null)
+                        left = leftInBuf.remove();
+
+                    boolean matched = false;
+
+                    while (!matched && requested > 0 && rightIdx < rightMaterialized.size()) {
+                        Row row = handler.concat(left, rightMaterialized.get(rightIdx++));
+
+                        if (!cond.test(row))
+                            continue;
+
+                        requested--;
+                        matched = true;
+                        downstream.push(left);

Review comment:
       If I understand the semantics of semi-join right, it should emit only one row for each match. For example [here](https://sqlperformance.com/2018/02/sql-plan/row-goals-part-2-semi-joins) I see:
   
   > A semi join returns a row from one join input (A) if there is at least one matching row on the other join input (B).
   > The essential differences between a semi join and a regular join are:
   > - Semi join either returns each row from input A, or it does not. No row duplication can occur.
   > - Regular join duplicates rows if there are multiple matches on the join predicate.
   > - Semi join is defined to only return columns from input A.
   > - Regular join may return columns from either (or both) join inputs.
   
   So `rightMaterialized` can be a sort of set here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org