You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/12/03 14:12:19 UTC
[ignite] branch ignite-12248 updated: simple execution
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 4c8a246 simple execution
4c8a246 is described below
commit 4c8a24606642357eb08cf1957b88364b3f1f5826
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Dec 3 17:12:03 2019 +0300
simple execution
---
.../java/org/apache/calcite/interpreter/Util.java | 28 +++++
.../query/calcite/exec/ConsumerNode.java | 75 +++++++++++++
.../processors/query/calcite/exec/JoinNode.java | 2 +-
.../processors/query/calcite/exec/ProjectNode.java | 47 ++++++++
.../query/calcite/exec/ScalarFactory.java | 121 +++++++++++++++++++++
.../processors/query/calcite/exec/ScanNode.java | 66 +++++++++++
.../query/calcite/prepare/IgnitePlanner.java | 8 +-
.../query/calcite/schema/IgniteTable.java | 6 +-
.../query/calcite/exec/ExecutionTest.java | 77 +++++++++++++
.../ignite/testsuites/IgniteCalciteTestSuite.java | 2 +
10 files changed, 425 insertions(+), 7 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java b/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
new file mode 100644
index 0000000..a4ccb83
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/calcite/interpreter/Util.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.interpreter;
+
+import org.apache.calcite.DataContext;
+
+/**
+ *
+ */
+public class Util {
+ public static Context createContext(DataContext ctx) {
+ return new Context(ctx);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
new file mode 100644
index 0000000..c2a6211
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ *
+ */
+public class ConsumerNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Sink<Object[]>, Iterator<Object[]> {
+ private static final int DEFAULT_BUFFER_SIZE = 1000;
+ private static final Object[] END = new Object[0];
+
+ private ArrayDeque<Object[]> buff;
+
+ protected ConsumerNode() {
+ super(Sink.noOp());
+
+ buff = new ArrayDeque<>(DEFAULT_BUFFER_SIZE);
+ }
+
+ @Override public Sink<Object[]> sink(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
+ }
+
+ @Override public boolean push(Object[] row) {
+ if (buff.size() == DEFAULT_BUFFER_SIZE)
+ return false;
+
+ buff.add(row);
+
+ return true;
+ }
+
+ @Override public void end() {
+ buff.add(END);
+ }
+
+ @Override public boolean hasNext() {
+ if (buff.isEmpty())
+ signal();
+
+ return buff.peek() != END;
+ }
+
+ @Override public Object[] next() {
+ if (buff.isEmpty())
+ signal();
+
+ if(!hasNext())
+ throw new NoSuchElementException();
+
+ return Objects.requireNonNull(buff.poll());
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
index 32818e1..5188148 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/JoinNode.java
@@ -22,7 +22,7 @@ import java.util.function.BiFunction;
/**
*
*/
-public class JoinNode extends AbstractNode {
+public class JoinNode extends AbstractNode<Object[]> {
private final BiFunction<Object[], Object[], Object[]> expression;
private final ArraySink<Object[]> left;
private final ArraySink<Object[]> right;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java
new file mode 100644
index 0000000..d7b7e57
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ProjectNode.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.function.Function;
+
+/**
+ *
+ */
+public class ProjectNode extends AbstractNode<Object[]> implements SingleNode<Object[]>, Sink<Object[]> {
+ private final Function<Object[], Object[]> projection;
+
+ public ProjectNode(Sink<Object[]> target, Function<Object[], Object[]> projection) {
+ super(target);
+
+ this.projection = projection;
+ }
+
+ @Override public Sink<Object[]> sink(int idx) {
+ if (idx != 0)
+ throw new IndexOutOfBoundsException();
+
+ return this;
+ }
+
+ @Override public boolean push(Object[] row) {
+ return target.push(projection.apply(row));
+ }
+
+ @Override public void end() {
+ target.end();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
new file mode 100644
index 0000000..62847ac
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.JaninoRexCompiler;
+import org.apache.calcite.interpreter.Scalar;
+import org.apache.calcite.interpreter.Util;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ *
+ */
+public class ScalarFactory {
+ private final JaninoRexCompiler rexCompiler;
+ private final RexBuilder builder;
+
+ public ScalarFactory(RexBuilder builder) {
+ rexCompiler = new JaninoRexCompiler(builder);
+ this.builder = builder;
+ }
+
+ public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
+ Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
+ Context ctx = Util.createContext(root);
+
+ return new FilterPredicate<>(ctx, scalar);
+ }
+
+ public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
+ RelDataType rowType = combinedType(leftType, rightType);
+
+ Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
+ Context ctx = Util.createContext(root);
+ ctx.values = new Object[rowType.getFieldCount()];
+
+ return new JoinExpression<>(ctx, scalar);
+ }
+
+ private RelDataType combinedType(RelDataType... types) {
+ RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
+
+ for (RelDataType type : types)
+ typeBuilder.addAll(type.getFieldList());
+
+ return typeBuilder.build();
+ }
+
+ private static class FilterPredicate<T> implements Predicate<T> {
+ private final Context ctx;
+ private final Scalar scalar;
+ private final Object[] vals;
+
+ private FilterPredicate(Context ctx, Scalar scalar) {
+ this.ctx = ctx;
+ this.scalar = scalar;
+
+ vals = new Object[1];
+ }
+
+ @Override public boolean test(T r) {
+ ctx.values = (Object[]) r;
+ scalar.execute(ctx, vals);
+ return (Boolean) vals[0];
+ }
+ }
+
+ private static class JoinExpression<T> implements BiFunction<T, T, T> {
+ private final Object[] vals;
+ private final Context ctx;
+ private final Scalar scalar;
+
+ private Object[] left0;
+
+ private JoinExpression(Context ctx, Scalar scalar) {
+ this.ctx = ctx;
+ this.scalar = scalar;
+
+ vals = new Object[1];
+ }
+
+ @Override public T apply(T left, T right) {
+ if (left0 != left) {
+ left0 = (Object[]) left;
+ System.arraycopy(left0, 0, ctx.values, 0, left0.length);
+ }
+
+ Object[] right0 = (Object[]) right;
+ System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
+
+ scalar.execute(ctx, vals);
+
+ if ((Boolean) vals[0])
+ return (T) Arrays.copyOf(ctx.values, ctx.values.length);
+
+ return null;
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
new file mode 100644
index 0000000..19edc8f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *
+ */
+public class ScanNode implements SingleNode<Object[]>, Source{
+ private static final Object[] END = new Object[0];
+
+ /** */
+ private final Sink<Object[]> target;
+ private final Iterator<Object[]> it;
+
+ private Object[] row;
+
+ protected ScanNode(Sink<Object[]> target, Iterator<Object[]> it) {
+ this.target = target;
+ this.it = it;
+ }
+
+ @Override public void signal() {
+ if (row == END)
+ return;
+
+ if (row != null && !target.push(row))
+ return;
+
+ row = null;
+
+ while (it.hasNext()) {
+ row = it.next();
+
+ if (!target.push(row))
+ return;
+ }
+
+ row = END;
+ target.end();
+ }
+
+ @Override public void sources(List<Source> sources) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public Sink<Object[]> sink(int idx) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 2506de8..dec15d4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.GraphToRelConverter;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
+import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -247,10 +248,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
return new Splitter().go((IgniteRel) rel);
}
- public Graph graph(RelNode node) {
+ public Graph graph(RelNode rel) {
ready();
- return null; // TODO
+ if (rel.getConvention() != IgniteRel.IGNITE_CONVENTION)
+ throw new IllegalArgumentException("IGNITE_CONVENTION is required.");
+
+ return new RelToGraphConverter().convert((IgniteRel) rel);
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 805c455..3007a85 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -76,12 +76,10 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
}
public DistributionTrait distributionTrait(PlannerContext context) {
- return Commons.plannerContext(context).distributionTrait(CU.cacheId(cacheName), rowType);
+ return context.distributionTrait(CU.cacheId(cacheName), rowType);
}
public FragmentInfo fragmentInfo(PlannerContext ctx) {
- PlannerContext ctx0 = Commons.plannerContext(ctx);
-
- return new FragmentInfo(ctx0.mapForCache(CU.cacheId(cacheName), ctx0.topologyVersion()));
+ return new FragmentInfo(ctx.mapForCache(CU.cacheId(cacheName), ctx.topologyVersion()));
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
new file mode 100644
index 0000000..744280a
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExecutionTest extends GridCommonAbstractTest {
+ @Test
+ public void testSimpleExecution() {
+ // SELECT P.ID, P.NAME, PR.NAME AS PROJECT
+ // FROM PERSON P
+ // INNER JOIN PROJECT PR
+ // ON P.ID = PR.RESP_ID
+ // WHERE P.ID >= 2
+
+ ConsumerNode node = new ConsumerNode();
+
+ FilterNode filter = new FilterNode(node.sink(), r -> (Integer) r[0] >= 2);
+ node.source(filter);
+
+ ProjectNode project = new ProjectNode(filter.sink(), r -> new Object[]{r[0], r[1], r[5]});
+ filter.source(project);
+
+ JoinNode join = new JoinNode(project.sink(), (r1, r2) -> r1[0] != r2[1] ? null : new Object[]{r1[0], r1[1], r1[2], r2[0], r2[1], r2[2]});
+ project.source(join);
+
+ ScanNode persons = new ScanNode(join.sink(0), Arrays.asList(
+ new Object[]{0, "Igor", "Seliverstov"},
+ new Object[]{1, "Roman", "Kondakov"},
+ new Object[]{2, "Ivan", "Pavlukhin"},
+ new Object[]{3, "Alexey", "Goncharuk"}
+ ).iterator());
+
+ ScanNode projects = new ScanNode(join.sink(1), Arrays.asList(
+ new Object[]{0, 2, "Calcite"},
+ new Object[]{1, 1, "SQL"},
+ new Object[]{2, 2, "Ignite"},
+ new Object[]{3, 0, "Core"}
+ ).iterator());
+
+ join.sources(Arrays.asList(persons, projects));
+
+ assert node.hasNext();
+
+ ArrayList<Object[]> rows = new ArrayList<>();
+
+ while (node.hasNext()) {
+ rows.add(node.next());
+ }
+
+ assertEquals(2, rows.size());
+
+ Assert.assertArrayEquals(new Object[]{2, "Ivan", "Calcite"}, rows.get(0));
+ Assert.assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 51a7bab..21c2163 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.exchange.OutboxTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -29,5 +30,6 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
CalciteQueryProcessorTest.class,
OutboxTest.class,
+ ExecutionTest.class
})
public class IgniteCalciteTestSuite { }