You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/12/03 14:12:19 UTC

[ignite] branch ignite-12248 updated: simple execution

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 4c8a246  simple execution
4c8a246 is described below

commit 4c8a24606642357eb08cf1957b88364b3f1f5826
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Dec 3 17:12:03 2019 +0300

    simple execution
---
 .../java/org/apache/calcite/interpreter/Util.java  |  28 +++++
 .../query/calcite/exec/ConsumerNode.java           |  75 +++++++++++++
 .../processors/query/calcite/exec/JoinNode.java    |   2 +-
 .../processors/query/calcite/exec/ProjectNode.java |  47 ++++++++
 .../query/calcite/exec/ScalarFactory.java          | 121 +++++++++++++++++++++
 .../processors/query/calcite/exec/ScanNode.java    |  66 +++++++++++
 .../query/calcite/prepare/IgnitePlanner.java       |   8 +-
 .../query/calcite/schema/IgniteTable.java          |   6 +-
 .../query/calcite/exec/ExecutionTest.java          |  77 +++++++++++++
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   2 +
 10 files changed, 425 insertions(+), 7 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
new file mode 100644
index 0000000..a4ccb83
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+
+/**
+ *
+ */
+public class Util {
+    public static Context createContext(DataContext ctx) {
+        return new Context(ctx);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
new file mode 100644
index 0000000..c2a6211
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ *
+ */
+public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Sink<Object[]>, Iterator<Object[]> {
+    private static final int DEFAULT_BUFFER_SIZE = 1000;
+    private static final Object[] END = new Object[0];
+
+    private ArrayDeque<Object[]> buff;
+
+    protected ConsumerNode() {
+        super(Sink.noOp());
+
+        buff = new ArrayDeque<>(DEFAULT_BUFFER_SIZE);
+    }
+
+    @Override public Sink<Object[]> sink(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    @Override public boolean push(Object[] row) {
+        if (buff.size() == DEFAULT_BUFFER_SIZE)
+            return false;
+
+        buff.add(row);
+
+        return true;
+    }
+
+    @Override public void end() {
+        buff.add(END);
+    }
+
+    @Override public boolean hasNext() {
+        if (buff.isEmpty())
+            signal();
+
+        return buff.peek() != END;
+    }
+
+    @Override public Object[] next() {
+        if (buff.isEmpty())
+            signal();
+
+        if(!hasNext())
+            throw new NoSuchElementException();
+
+        return Objects.requireNonNull(buff.poll());
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
index 32818e1..5188148 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
@@ -22,7 +22,7 @@ import java.util.function.BiFunction;
 /**
  *
  */
-public class JoinNode extends AbstractNode {
+public class JoinNode extends AbstractNode<Object[]> {
     private final BiFunction<Object[], Object[], Object[]> expression;
     private final ArraySink<Object[]> left;
     private final ArraySink<Object[]> right;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java
new file mode 100644
index 0000000..d7b7e57
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.function.Function;
+
+/**
+ *
+ */
+public class ProjectNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Sink<Object[]> {
+    private final Function<Object[], Object[]> projection;
+
+    public ProjectNode(Sink<Object[]> target, Function<Object[], Object[]> projection) {
+        super(target);
+
+        this.projection = projection;
+    }
+
+    @Override public Sink<Object[]> sink(int idx) {
+        if (idx != 0)
+            throw new IndexOutOfBoundsException();
+
+        return this;
+    }
+
+    @Override public boolean push(Object[] row) {
+        return target.push(projection.apply(row));
+    }
+
+    @Override public void end() {
+        target.end();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
new file mode 100644
index 0000000..62847ac
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.JaninoRexCompiler;
+import org.apache.calcite.interpreter.Scalar;
+import org.apache.calcite.interpreter.Util;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ *
+ */
+public class ScalarFactory {
+    private final JaninoRexCompiler rexCompiler;
+    private final RexBuilder builder;
+
+    public ScalarFactory(RexBuilder builder) {
+        rexCompiler = new JaninoRexCompiler(builder);
+        this.builder = builder;
+    }
+
+    public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
+        Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
+        Context ctx = Util.createContext(root);
+
+        return new FilterPredicate<>(ctx, scalar);
+    }
+
+    public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
+        RelDataType rowType = combinedType(leftType, rightType);
+
+        Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
+        Context ctx = Util.createContext(root);
+        ctx.values = new Object[rowType.getFieldCount()];
+
+        return new JoinExpression<>(ctx, scalar);
+    }
+
+    private RelDataType combinedType(RelDataType... types) {
+        RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
+
+        for (RelDataType type : types)
+            typeBuilder.addAll(type.getFieldList());
+
+        return typeBuilder.build();
+    }
+
+    private static class FilterPredicate<T> implements Predicate<T> {
+        private final Context ctx;
+        private final Scalar scalar;
+        private final Object[] vals;
+
+        private FilterPredicate(Context ctx, Scalar scalar) {
+            this.ctx = ctx;
+            this.scalar = scalar;
+
+            vals = new Object[1];
+        }
+
+        @Override public boolean test(T r) {
+            ctx.values = (Object[]) r;
+            scalar.execute(ctx, vals);
+            return (Boolean) vals[0];
+        }
+    }
+
+    private static class JoinExpression<T> implements BiFunction<T, T, T> {
+        private final Object[] vals;
+        private final Context ctx;
+        private final Scalar scalar;
+
+        private Object[] left0;
+
+        private JoinExpression(Context ctx, Scalar scalar) {
+            this.ctx = ctx;
+            this.scalar = scalar;
+
+            vals = new Object[1];
+        }
+
+        @Override public T apply(T left, T right) {
+            if (left0 != left) {
+                left0 = (Object[]) left;
+                System.arraycopy(left0, 0, ctx.values, 0, left0.length);
+            }
+
+            Object[] right0 = (Object[]) right;
+            System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
+
+            scalar.execute(ctx, vals);
+
+            if ((Boolean) vals[0])
+                return (T) Arrays.copyOf(ctx.values, ctx.values.length);
+
+            return null;
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
new file mode 100644
index 0000000..19edc8f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *
+ */
+public class ScanNode implements SingleNode<Object[]>, Source{
+    private static final Object[] END = new Object[0];
+
+    /** */
+    private final Sink<Object[]> target;
+    private final Iterator<Object[]> it;
+
+    private Object[] row;
+
+    protected ScanNode(Sink<Object[]> target, Iterator<Object[]> it) {
+        this.target = target;
+        this.it = it;
+    }
+
+    @Override public void signal() {
+        if (row == END)
+            return;
+
+        if (row != null && !target.push(row))
+            return;
+
+        row = null;
+
+        while (it.hasNext()) {
+            row = it.next();
+
+            if (!target.push(row))
+                return;
+        }
+
+        row = END;
+        target.end();
+    }
+
+    @Override public void sources(List<Source> sources) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Sink<Object[]> sink(int idx) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 2506de8..dec15d4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
 import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
+import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -247,10 +248,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         return new Splitter().go((IgniteRel) rel);
     }
 
-    public Graph graph(RelNode node) {
+    public Graph graph(RelNode rel) {
         ready();
 
-        return null; // TODO
+        if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
+            throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+
+        return new RelToGraphConverter().convert((IgniteRel) rel);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 805c455..3007a85 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -76,12 +76,10 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
     }
 
     public DistributionTrait distributionTrait(PlannerContext context) {
-        return Commons.plannerContext(context).distributionTrait(CU.cacheId(cacheName), rowType);
+        return context.distributionTrait(CU.cacheId(cacheName), rowType);
     }
 
     public FragmentInfo fragmentInfo(PlannerContext ctx) {
-        PlannerContext ctx0 = Commons.plannerContext(ctx);
-
-        return new FragmentInfo(ctx0.mapForCache(CU.cacheId(cacheName), ctx0.topologyVersion()));
+        return new FragmentInfo(ctx.mapForCache(CU.cacheId(cacheName), ctx.topologyVersion()));
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
new file mode 100644
index 0000000..744280a
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExecutionTest extends GridCommonAbstractTest {
+    @Test
+    public void testSimpleExecution() {
+        // SELECT P.ID, P.NAME, PR.NAME AS PROJECT
+        // FROM PERSON P
+        // INNER JOIN PROJECT PR
+        // ON P.ID = PR.RESP_ID
+        // WHERE P.ID >= 2
+
+        ConsumerNode node = new ConsumerNode();
+
+        FilterNode filter = new FilterNode(node.sink(), r -> (Integer) r[0] >= 2);
+        node.source(filter);
+
+        ProjectNode project = new ProjectNode(filter.sink(), r -> new Object[]{r[0], r[1], r[5]});
+        filter.source(project);
+
+        JoinNode join = new JoinNode(project.sink(), (r1, r2) -> r1[0] != r2[1] ? null : new Object[]{r1[0], r1[1], r1[2], r2[0], r2[1], r2[2]});
+        project.source(join);
+
+        ScanNode persons = new ScanNode(join.sink(0), Arrays.asList(
+            new Object[]{0, "Igor", "Seliverstov"},
+            new Object[]{1, "Roman", "Kondakov"},
+            new Object[]{2, "Ivan", "Pavlukhin"},
+            new Object[]{3, "Alexey", "Goncharuk"}
+        ).iterator());
+
+        ScanNode projects = new ScanNode(join.sink(1), Arrays.asList(
+            new Object[]{0, 2, "Calcite"},
+            new Object[]{1, 1, "SQL"},
+            new Object[]{2, 2, "Ignite"},
+            new Object[]{3, 0, "Core"}
+        ).iterator());
+
+        join.sources(Arrays.asList(persons, projects));
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext()) {
+            rows.add(node.next());
+        }
+
+        assertEquals(2, rows.size());
+
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", "Calcite"}, rows.get(0));
+        Assert.assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 51a7bab..21c2163 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
 import org.apache.ignite.internal.processors.query.calcite.exchange.OutboxTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -29,5 +30,6 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     CalciteQueryProcessorTest.class,
     OutboxTest.class,
+    ExecutionTest.class
 })
 public class IgniteCalciteTestSuite { }