You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:20:56 UTC

[GitHub] [ignite] korlov42 commented on a change in pull request #7915: IGNITE-12868 Calcite integration. LEFT, RIGHT join support

korlov42 commented on a change in pull request #7915:
URL: https://github.com/apache/ignite/pull/7915#discussion_r437434688



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
##########
@@ -151,6 +152,285 @@ public void testUnionAll() {
         assertEquals(12, res.size());
     }
 
+    /** */
+    @Test
+    public void testLeftJoin() {
+        //    select e.id, e.name, d.name as dep_name
+        //      from emp e
+        // left join dep d
+        //        on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}
+        ));
+
+        LeftJoinNode<Object[]> join = new LeftJoinNode<>(
+            ctx,
+            r -> r[2] == r[3],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(4, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+    }
+
+    /** */
+    @Test
+    public void testRightJoin() {
+        //     select e.id, e.name, d.name as dep_name
+        //       from dep d
+        // right join emp e
+        //         on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}
+        ));
+
+        RightJoinNode<Object[]> join = new RightJoinNode<>(
+            ctx,
+            r -> r[0] == r[4],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(deps, persons));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[2], r[3], r[1]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(4, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+    }
+
+    /** */
+    @Test
+    public void testFullOuterJoin() {
+        //          select e.id, e.name, d.name as dep_name
+        //            from emp e
+        // full outer join dep d
+        //              on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"},
+            new Object[]{3, "QA"}
+        ));
+
+        FullOuterJoinNode<Object[]> join = new FullOuterJoinNode<>(
+            ctx,
+            r -> r[2] == r[3],
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[3];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            },
+            new RowHandler.RowFactory<Object[]>() {
+                @Override public Object[] create() {
+                    return new Object[2];
+                }
+
+                @Override public Object[] create(Object... fields) {
+                    return create();
+                }
+            }
+        );
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new Object[]{r[0], r[1], r[4]});
+        project.register(join);
+
+        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        assertEquals(5, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{0, "Igor", "Core"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{1, "Roman", "SQL"}, rows.get(1));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", null}, rows.get(2));
+        Assert.assertArrayEquals(new Object[]{3, "Alexey", "Core"}, rows.get(3));
+        Assert.assertArrayEquals(new Object[]{null, null, "QA"}, rows.get(4));
+    }
+
+    /** */
+    @Test
+    public void testSemiJoin() {
+        //    select d.name as dep_name
+        //      from dep d
+        // semi join emp e
+        //        on e.depno = d.depno
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+
+        ScanNode<Object[]> persons = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{0, "Igor", 1},
+            new Object[]{1, "Roman", 2},
+            new Object[]{2, "Ivan", null},
+            new Object[]{3, "Alexey", 1}
+        ));
+
+        ScanNode<Object[]> deps = new ScanNode<>(ctx, Arrays.asList(
+            new Object[]{1, "Core"},
+            new Object[]{2, "SQL"}

Review comment:
       Yep, sure. Fixed.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AntiJoinNode.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class AntiJoinNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final Predicate<Row> cond;
+
+    /** */
+    private final RowHandler<Row> handler;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waitingLeft;
+
+    /** */
+    private int waitingRight;
+
+    /** */
+    private final List<Row> rightMaterialized = new ArrayList<>(IN_BUFFER_SIZE);
+
+    /** */
+    private final Deque<Row> leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    private Row left;
+
+    /** */
+    private int rightIdx;
+
+    /**
+     * @param ctx Execution context.
+     * @param cond Join expression.
+     */
+    public AntiJoinNode(ExecutionContext<Row> ctx, Predicate<Row> cond) {
+        super(ctx);
+
+        this.cond = cond;
+        handler = ctx.rowHandler();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) {
+        checkThread();
+
+        assert !F.isEmpty(sources) && sources.size() == 2;
+        assert rowsCnt > 0 && requested == 0;
+
+        requested = rowsCnt;
+
+        if (!inLoop)
+            context().execute(this::flushFromBuffer);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        if (idx == 0)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushLeft(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endLeft();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    AntiJoinNode.this.onError(e);
+                }
+            };
+        else if (idx == 1)
+            return new Downstream<Row>() {
+                /** {@inheritDoc} */
+                @Override public void push(Row row) {
+                    pushRight(row);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void end() {
+                    endRight();
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onError(Throwable e) {
+                    AntiJoinNode.this.onError(e);
+                }
+            };
+
+        throw new IndexOutOfBoundsException();
+    }
+
+    /** */
+    private void pushLeft(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft--;
+
+        leftInBuf.add(row);
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void pushRight(Row row) {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight--;
+
+        rightMaterialized.add(row);
+
+        if (waitingRight == 0)
+            sources.get(1).request(waitingRight = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    private void endLeft() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingLeft > 0;
+
+        waitingLeft = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void endRight() {
+        checkThread();
+
+        assert downstream != null;
+        assert waitingRight > 0;
+
+        waitingRight = -1;
+
+        flushFromBuffer();
+    }
+
+    /** */
+    private void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+    }
+
+    /** */
+    private void flushFromBuffer() {
+        inLoop = true;
+        try {
+            if (waitingRight == -1) {

Review comment:
       Good point. Fixed.

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FullOuterJoinNode.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class FullOuterJoinNode<Row> extends AbstractNode<Row> {
+    /** Left row factory. */
+    private final RowHandler.RowFactory<Row> leftRowFactory;
+
+    /** Right row factory. */
+    private final RowHandler.RowFactory<Row> rightRowFactory;
+
+    /** Whether current left row was matched or not. */
+    private boolean leftMatched;
+
+    /** */
+    private final Set<Row> rightNotMatched = new HashSet<>();

Review comment:
       Actually it was what I relied on. Identity hash code and reference equality should be used here. Anyway I just rewrote this part with BitSet, so fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org