You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/06/09 09:14:43 UTC
[ignite-3] branch main updated: IGNITE-14834 Move mapping and
splitter related code to Ignite 3.0 (#166)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 67e8ede IGNITE-14834 Move mapping and splitter related code to Ignite 3.0 (#166)
67e8ede is described below
commit 67e8ede31e812b38feed1838db34bf9e18328cf1
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Wed Jun 9 12:14:37 2021 +0300
IGNITE-14834 Move mapping and splitter related code to Ignite 3.0 (#166)
---
modules/calcite/pom.xml | 5 +
.../internal/processors/query/calcite/Stubs.java | 5 +
.../{Stubs.java => externalize/RelInputEx.java} | 30 +-
.../query/calcite/externalize/RelJson.java | 918 ++++++++++
.../query/calcite/externalize/RelJsonReader.java | 330 ++++
.../query/calcite/externalize/RelJsonWriter.java | 168 ++
.../query/calcite/metadata/ColocationGroup.java | 230 +++
.../ColocationMappingException.java} | 25 +-
.../query/calcite/metadata/FragmentMapping.java | 147 ++
.../calcite/metadata/FragmentMappingException.java | 58 +
.../calcite/metadata/IgniteMdFragmentMapping.java | 226 +++
.../query/calcite/metadata/IgniteMetadata.java | 24 +
.../query/calcite/metadata/MappingService.java | 41 +
.../NodeMappingException.java} | 35 +-
.../query/calcite/metadata/RelMetadataQueryEx.java | 28 +
.../calcite/prepare/AbstractMultiStepPlan.java | 101 ++
.../processors/query/calcite/prepare/Cloner.java | 257 +++
.../{Stubs.java => prepare/ExecutionPlan.java} | 32 +-
.../processors/query/calcite/prepare/Fragment.java | 193 ++
.../query/calcite/prepare/FragmentSplitter.java | 144 ++
.../{Stubs.java => prepare/IdGenerator.java} | 21 +-
.../query/calcite/prepare/MultiStepPlan.java | 54 +
.../MultiStepQueryPlan.java} | 29 +-
.../query/calcite/prepare/PlanningContext.java | 31 +-
.../calcite/{Stubs.java => prepare/QueryPlan.java} | 31 +-
.../query/calcite/prepare/QueryTemplate.java | 136 ++
.../processors/query/calcite/prepare/Splitter.java | 125 ++
.../query/calcite/rel/AbstractIgniteSpool.java | 47 +
.../query/calcite/rel/IgniteHashIndexSpool.java | 4 +-
.../query/calcite/rel/IgniteMergeJoin.java | 5 +-
.../query/calcite/rel/IgniteSortedIndexSpool.java | 4 +-
.../query/calcite/rel/IgniteTableFunctionScan.java | 11 +
.../query/calcite/rel/IgniteTableModify.java | 2 +-
.../query/calcite/rel/IgniteTableSpool.java | 5 +-
.../query/calcite/schema/IgniteTable.java | 10 +
.../query/calcite/util/IgniteMethod.java | 50 +
.../query/calcite/planner/AbstractPlannerTest.java | 96 +
.../query/calcite/planner/PlannerTest.java | 1847 ++++++++++++++++++++
.../apache/ignite/internal/util/ArrayUtils.java | 56 +
.../ignite/internal/util/IgniteIntIterator.java} | 28 +-
.../apache/ignite/internal/util/IgniteIntList.java | 526 ++++++
.../apache/ignite/internal/util/IgniteUtils.java | 82 +
.../internal/testframework/IgniteTestUtils.java | 116 ++
43 files changed, 6194 insertions(+), 119 deletions(-)
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index 8bce051..c44ae05 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -46,6 +46,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-api</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
index 01746a7..ce7d926 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
@@ -24,6 +24,11 @@ public class Stubs {
}
/** */
+ public static long longFoo(Object... args) {
+ return args == null ? 0 : args.length;
+ }
+
+ /** */
public static boolean boolFoo(Object... args) {
return args == null;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
index 01746a7..221809c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelInputEx.java
@@ -2,11 +2,11 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
+ * The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,22 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.externalize;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
- /** */
- public static boolean boolFoo(Object... args) {
- return args == null;
- }
-
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
- }
+/** */
+public interface RelInputEx extends RelInput {
+ /**
+ * @param tag Tag.
+ * @return A collation value.
+ */
+ RelCollation getCollation(String tag);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
new file mode 100644
index 0000000..f8a5b7f
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java
@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl.JavaType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexSlot;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVariable;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsertKeyword;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+import org.apache.calcite.sql.SqlJsonQueryWrapperBehavior;
+import org.apache.calcite.sql.SqlJsonValueEmptyOrErrorBehavior;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.IgniteUtils.igniteClassLoader;
+
+/**
+ * Utilities for converting {@link RelNode} into JSON format.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+class RelJson {
+ /** */
+ private final RelOptCluster cluster;
+
+ /** */
+ @SuppressWarnings("PublicInnerClass") @FunctionalInterface
+ public static interface RelFactory extends Function<RelInput, RelNode> {
+ /** {@inheritDoc} */
+ @Override RelNode apply(RelInput input);
+ }
+
+ /** */
+ private static final LoadingCache<String, RelFactory> FACTORIES_CACHE = CacheBuilder.newBuilder()
+ .build(CacheLoader.from(RelJson::relFactory));
+
+ /** */
+ private static RelFactory relFactory(String typeName) {
+ Class<?> clazz = null;
+
+ if (!typeName.contains(".")) {
+ for (String package_ : PACKAGES) {
+ if ((clazz = classForName(package_ + typeName, true)) != null)
+ break;
+ }
+ }
+
+ if (clazz == null)
+ clazz = classForName(typeName, false);
+
+ assert RelNode.class.isAssignableFrom(clazz);
+
+ Constructor<RelNode> constructor;
+
+ try {
+ constructor = (Constructor<RelNode>)clazz.getConstructor(RelInput.class);
+ }
+ catch (NoSuchMethodException e) {
+ throw new IgniteException("class does not have required constructor, "
+ + clazz + "(RelInput)");
+ }
+
+ BlockBuilder builder = new BlockBuilder();
+ ParameterExpression input_ = Expressions.parameter(RelInput.class);
+ builder.add(Expressions.new_(constructor, input_));
+ MethodDeclaration declaration = Expressions.methodDecl(
+ Modifier.PUBLIC, RelNode.class, "apply", asList(input_), builder.toBlock());
+ return Commons.compile(RelFactory.class, Expressions.toString(asList(declaration), "\n", true));
+ }
+
+ /** */
+ private static final ImmutableMap<String, Enum<?>> ENUM_BY_NAME;
+
+ /** */
+ static {
+ // Build a mapping from enum constants (e.g. LEADING) to the enum
+ // that contains them (e.g. SqlTrimFunction.Flag). If there two
+ // enum constants have the same name, the builder will throw.
+ final ImmutableMap.Builder<String, Enum<?>> enumByName =
+ ImmutableMap.builder();
+
+ register(enumByName, JoinConditionType.class);
+ register(enumByName, JoinType.class);
+ register(enumByName, Direction.class);
+ register(enumByName, NullDirection.class);
+ register(enumByName, SqlTypeName.class);
+ register(enumByName, SqlKind.class);
+ register(enumByName, SqlSyntax.class);
+ register(enumByName, SqlExplain.Depth.class);
+ register(enumByName, SqlExplainFormat.class);
+ register(enumByName, SqlExplainLevel.class);
+ register(enumByName, SqlInsertKeyword.class);
+ register(enumByName, SqlJsonConstructorNullClause.class);
+ register(enumByName, SqlJsonQueryWrapperBehavior.class);
+ register(enumByName, SqlJsonValueEmptyOrErrorBehavior.class);
+ register(enumByName, SqlMatchRecognize.AfterOption.class);
+ register(enumByName, SqlSelectKeyword.class);
+ register(enumByName, SqlTrimFunction.Flag.class);
+ register(enumByName, TimeUnitRange.class);
+
+ ENUM_BY_NAME = enumByName.build();
+ }
+
+ /** */
+ private static void register(ImmutableMap.Builder<String, Enum<?>> builder, Class<? extends Enum> aClass) {
+ String preffix = aClass.getSimpleName() + "#";
+ for (Enum enumConstant : aClass.getEnumConstants())
+ builder.put(preffix + enumConstant.name(), enumConstant);
+ }
+
+ /** */
+ private static Class<?> classForName(String typeName, boolean skipNotFound) {
+ try {
+ return IgniteUtils.forName(typeName, igniteClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ if (!skipNotFound)
+ throw new IgniteException("unknown type " + typeName);
+ }
+
+ return null;
+ }
+
+ /** */
+ private static final List<String> PACKAGES =
+ ImmutableList.of(
+ "org.apache.ignite.internal.processors.query.calcite.rel.",
+ "org.apache.ignite.internal.processors.query.calcite.rel.agg.",
+ "org.apache.ignite.internal.processors.query.calcite.rel.set.",
+ "org.apache.calcite.rel.",
+ "org.apache.calcite.rel.core.",
+ "org.apache.calcite.rel.logical.",
+ "org.apache.calcite.adapter.jdbc.",
+ "org.apache.calcite.adapter.jdbc.JdbcRules$");
+
+ /** */
+ RelJson(RelOptCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /** */
+ Function<RelInput, RelNode> factory(String type) {
+ return FACTORIES_CACHE.getUnchecked(type);
+ }
+
+ /** */
+ String classToTypeName(Class<? extends RelNode> class_) {
+ if (IgniteRel.class.isAssignableFrom(class_))
+ return class_.getSimpleName();
+
+ String canonicalName = class_.getName();
+ for (String package_ : PACKAGES) {
+ if (canonicalName.startsWith(package_)) {
+ String remaining = canonicalName.substring(package_.length());
+ if (remaining.indexOf('.') < 0 && remaining.indexOf('$') < 0)
+ return remaining;
+ }
+ }
+ return canonicalName;
+ }
+
+ /** */
+ Object toJson(Object value) {
+ if (value == null
+ || value instanceof Number
+ || value instanceof String
+ || value instanceof Boolean)
+ return value;
+ else if (value instanceof Enum)
+ return toJson((Enum)value);
+ else if (value instanceof RexNode)
+ return toJson((RexNode)value);
+ else if (value instanceof RexWindow)
+ return toJson((RexWindow)value);
+ else if (value instanceof RexFieldCollation)
+ return toJson((RexFieldCollation)value);
+ else if (value instanceof RexWindowBound)
+ return toJson((RexWindowBound)value);
+ else if (value instanceof CorrelationId)
+ return toJson((CorrelationId)value);
+ else if (value instanceof List) {
+ List<Object> list = list();
+ for (Object o : (Iterable)value)
+ list.add(toJson(o));
+ return list;
+ }
+ else if (value instanceof ImmutableBitSet) {
+ List<Object> list = list();
+ for (Integer integer : (Iterable<Integer>)value)
+ list.add(toJson(integer));
+ return list;
+ }
+ else if (value instanceof Set) {
+ Set<Object> set = set();
+ for (Object o : (Iterable)value)
+ set.add(toJson(o));
+ return set;
+ }
+ else if (value instanceof DistributionTrait)
+ return toJson((DistributionTrait)value);
+ else if (value instanceof AggregateCall)
+ return toJson((AggregateCall)value);
+ else if (value instanceof RelCollationImpl)
+ return toJson((RelCollationImpl)value);
+ else if (value instanceof RelDataType)
+ return toJson((RelDataType)value);
+ else if (value instanceof RelDataTypeField)
+ return toJson((RelDataTypeField)value);
+ else
+ throw new UnsupportedOperationException("type not serializable: "
+ + value + " (type " + value.getClass().getCanonicalName() + ")");
+ }
+
+ /** */
+ RelCollation toCollation(List<Map<String, Object>> jsonFieldCollations) {
+ if (jsonFieldCollations == null)
+ return RelCollations.EMPTY;
+
+ List<RelFieldCollation> fieldCollations = jsonFieldCollations.stream()
+ .map(this::toFieldCollation)
+ .collect(Collectors.toList());
+
+ return RelCollations.of(fieldCollations);
+ }
+
+ /** */
+ IgniteDistribution toDistribution(Object distribution) {
+ if (distribution instanceof String) {
+ switch ((String)distribution) {
+ case "single":
+ return IgniteDistributions.single();
+ case "any":
+ return IgniteDistributions.any();
+ case "broadcast":
+ return IgniteDistributions.broadcast();
+ case "random":
+ return IgniteDistributions.random();
+ }
+ }
+
+ Map<String, Object> map = (Map<String, Object>)distribution;
+ Number cacheId = (Number)map.get("cacheId");
+ if (cacheId != null)
+ return IgniteDistributions.hash((List<Integer>)map.get("keys"),
+ DistributionFunction.affinity(cacheId.intValue(), cacheId));
+
+ return IgniteDistributions.hash((List<Integer>)map.get("keys"), DistributionFunction.hash());
+ }
+
+ /** */
+ RelDataType toType(RelDataTypeFactory typeFactory, Object o) {
+ if (o instanceof List) {
+ List<Map<String, Object>> jsonList = (List<Map<String, Object>>)o;
+ RelDataTypeFactory.Builder builder = typeFactory.builder();
+ for (Map<String, Object> jsonMap : jsonList)
+ builder.add((String)jsonMap.get("name"), toType(typeFactory, jsonMap));
+ return builder.build();
+ }
+ else if (o instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>)o;
+ String clazz = (String)map.get("class");
+
+ if (clazz != null) {
+ RelDataType type = typeFactory.createJavaType(classForName(clazz, false));
+
+ if (Boolean.TRUE == map.get("nullable"))
+ type = typeFactory.createTypeWithNullability(type, true);
+
+ return type;
+ }
+
+ Object fields = map.get("fields");
+
+ if (fields != null)
+ return toType(typeFactory, fields);
+ else {
+ SqlTypeName sqlTypeName = toEnum(map.get("type"));
+ Integer precision = (Integer)map.get("precision");
+ Integer scale = (Integer)map.get("scale");
+ if (SqlTypeName.INTERVAL_TYPES.contains(sqlTypeName)) {
+ TimeUnit startUnit = sqlTypeName.getStartUnit();
+ TimeUnit endUnit = sqlTypeName.getEndUnit();
+ return typeFactory.createSqlIntervalType(
+ new SqlIntervalQualifier(startUnit, endUnit, SqlParserPos.ZERO));
+ }
+ else if (sqlTypeName == SqlTypeName.ARRAY)
+ return typeFactory.createArrayType(toType(typeFactory, map.get("elementType")), -1);
+ RelDataType type;
+ if (precision == null)
+ type = typeFactory.createSqlType(sqlTypeName);
+ else if (scale == null)
+ type = typeFactory.createSqlType(sqlTypeName, precision);
+ else
+ type = typeFactory.createSqlType(sqlTypeName, precision, scale);
+
+ if (Boolean.TRUE == map.get("nullable"))
+ type = typeFactory.createTypeWithNullability(type, true);
+
+ return type;
+ }
+ }
+ else {
+ SqlTypeName sqlTypeName = toEnum(o);
+ return typeFactory.createSqlType(sqlTypeName);
+ }
+ }
+
+ /** */
+ RexNode toRex(RelInput relInput, Object o) {
+ RelOptCluster cluster = relInput.getCluster();
+ RexBuilder rexBuilder = cluster.getRexBuilder();
+ if (o == null)
+ return null;
+ else if (o instanceof Map) {
+ Map map = (Map)o;
+ Map<String, Object> opMap = (Map)map.get("op");
+ IgniteTypeFactory typeFactory = Commons.typeFactory(cluster);
+ if (opMap != null) {
+ if (map.containsKey("class"))
+ opMap.put("class", map.get("class"));
+ List operands = (List)map.get("operands");
+ List<RexNode> rexOperands = toRexList(relInput, operands);
+ Object jsonType = map.get("type");
+ Map window = (Map)map.get("window");
+ if (window != null) {
+ SqlAggFunction operator = (SqlAggFunction)toOp(opMap);
+ RelDataType type = toType(typeFactory, jsonType);
+ List<RexNode> partitionKeys = new ArrayList<>();
+ if (window.containsKey("partition"))
+ partitionKeys = toRexList(relInput, (List)window.get("partition"));
+ List<RexFieldCollation> orderKeys = new ArrayList<>();
+ if (window.containsKey("order"))
+ orderKeys = toRexFieldCollationList(relInput, (List)window.get("order"));
+ RexWindowBound lowerBound;
+ RexWindowBound upperBound;
+ boolean physical;
+ if (window.get("rows-lower") != null) {
+ lowerBound = toRexWindowBound(relInput, (Map)window.get("rows-lower"));
+ upperBound = toRexWindowBound(relInput, (Map)window.get("rows-upper"));
+ physical = true;
+ }
+ else if (window.get("range-lower") != null) {
+ lowerBound = toRexWindowBound(relInput, (Map)window.get("range-lower"));
+ upperBound = toRexWindowBound(relInput, (Map)window.get("range-upper"));
+ physical = false;
+ }
+ else {
+ // No ROWS or RANGE clause
+ lowerBound = null;
+ upperBound = null;
+ physical = false;
+ }
+ boolean distinct = (Boolean)map.get("distinct");
+ return rexBuilder.makeOver(type, operator, rexOperands, partitionKeys,
+ ImmutableList.copyOf(orderKeys), lowerBound, upperBound, physical,
+ true, false, distinct, false);
+ }
+ else {
+ SqlOperator operator = toOp(opMap);
+ RelDataType type;
+ if (jsonType != null)
+ type = toType(typeFactory, jsonType);
+ else
+ type = rexBuilder.deriveReturnType(operator, rexOperands);
+ return rexBuilder.makeCall(type, operator, rexOperands);
+ }
+ }
+ Integer input = (Integer)map.get("input");
+ if (input != null) {
+ // Check if it is a local ref.
+ if (map.containsKey("type")) {
+ RelDataType type = toType(typeFactory, map.get("type"));
+ return map.get("dynamic") == Boolean.TRUE
+ ? rexBuilder.makeDynamicParam(type, input)
+ : rexBuilder.makeLocalRef(type, input);
+ }
+
+ List<RelNode> inputNodes = relInput.getInputs();
+ int i = input;
+ for (RelNode inputNode : inputNodes) {
+ RelDataType rowType = inputNode.getRowType();
+ if (i < rowType.getFieldCount()) {
+ RelDataTypeField field = rowType.getFieldList().get(i);
+ return rexBuilder.makeInputRef(field.getType(), input);
+ }
+ i -= rowType.getFieldCount();
+ }
+ throw new RuntimeException("input field " + input + " is out of range");
+ }
+
+ String field = (String)map.get("field");
+ if (field != null) {
+ Object jsonExpr = map.get("expr");
+ RexNode expr = toRex(relInput, jsonExpr);
+ return rexBuilder.makeFieldAccess(expr, field, true);
+ }
+
+ String correl = (String)map.get("correl");
+ if (correl != null) {
+ RelDataType type = toType(typeFactory, map.get("type"));
+ return rexBuilder.makeCorrel(type, new CorrelationId(correl));
+ }
+
+ if (map.containsKey("literal")) {
+ Object literal = map.get("literal");
+ RelDataType type = toType(typeFactory, map.get("type"));
+
+ if (literal == null)
+ return rexBuilder.makeNullLiteral(type);
+
+ if (type.getSqlTypeName() == SqlTypeName.SYMBOL)
+ literal = toEnum(literal);
+
+ return rexBuilder.makeLiteral(literal, type, false);
+ }
+
+ throw new UnsupportedOperationException("cannot convert to rex " + o);
+ }
+ else if (o instanceof Boolean)
+ return rexBuilder.makeLiteral((Boolean)o);
+ else if (o instanceof String)
+ return rexBuilder.makeLiteral((String)o);
+ else if (o instanceof Number) {
+ Number number = (Number)o;
+ if (number instanceof Double || number instanceof Float)
+ return rexBuilder.makeApproxLiteral(
+ BigDecimal.valueOf(number.doubleValue()));
+ else
+ return rexBuilder.makeExactLiteral(
+ BigDecimal.valueOf(number.longValue()));
+ }
+ else
+ throw new UnsupportedOperationException("cannot convert to rex " + o);
+ }
+
+ /** */
+ SqlOperator toOp(Map<String, Object> map) {
+ // in case different operator has the same kind, check with both name and kind.
+ String name = map.get("name").toString();
+ SqlKind sqlKind = toEnum(map.get("kind"));
+ SqlSyntax sqlSyntax = toEnum(map.get("syntax"));
+ List<SqlOperator> operators = new ArrayList<>();
+
+ FRAMEWORK_CONFIG.getOperatorTable().lookupOperatorOverloads(
+ new SqlIdentifier(name, new SqlParserPos(0, 0)),
+ null,
+ sqlSyntax,
+ operators,
+ SqlNameMatchers.liberal()
+ );
+
+ for (SqlOperator operator : operators)
+ if (operator.kind == sqlKind)
+ return operator;
+ String class_ = (String)map.get("class");
+ if (class_ != null)
+ return AvaticaUtils.instantiatePlugin(SqlOperator.class, class_);
+ return null;
+ }
+
+ /** */
+ <T> List<T> list() {
+ return new ArrayList<>();
+ }
+
+ /** */
+ <T> Set<T> set() {
+ return new LinkedHashSet<>();
+ }
+
+ /** */
+ <T> Map<String, T> map() {
+ return new LinkedHashMap<>();
+ }
+
+ /** */
+ private <T extends Enum<T>> T toEnum(Object o) {
+ if (o instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>)o;
+ String class_ = (String)map.get("class");
+ String name = map.get("name").toString();
+ return Util.enumVal((Class<T>)classForName(class_, false), name);
+ }
+
+ assert o instanceof String && ENUM_BY_NAME.containsKey(o);
+
+ String name = (String)o;
+ return (T)ENUM_BY_NAME.get(name);
+ }
+
+ /** */
+ private RelFieldCollation toFieldCollation(Map<String, Object> map) {
+ Integer field = (Integer)map.get("field");
+ Direction direction = toEnum(map.get("direction"));
+ NullDirection nullDirection = toEnum(map.get("nulls"));
+ return new RelFieldCollation(field, direction, nullDirection);
+ }
+
+ /** */
+ private List<RexFieldCollation> toRexFieldCollationList(RelInput relInput, List<Map<String, Object>> order) {
+ if (order == null)
+ return null;
+
+ List<RexFieldCollation> list = new ArrayList<>();
+ for (Map<String, Object> o : order) {
+ RexNode expr = toRex(relInput, o.get("expr"));
+ Set<SqlKind> directions = new HashSet<>();
+ if (toEnum(o.get("direction")) == Direction.DESCENDING)
+ directions.add(SqlKind.DESCENDING);
+ if (toEnum(o.get("null-direction")) == NullDirection.FIRST)
+ directions.add(SqlKind.NULLS_FIRST);
+ else
+ directions.add(SqlKind.NULLS_LAST);
+ list.add(new RexFieldCollation(expr, directions));
+ }
+ return list;
+ }
+
+ /** */
+ private RexWindowBound toRexWindowBound(RelInput input, Map<String, Object> map) {
+ if (map == null)
+ return null;
+
+ String type = (String)map.get("type");
+ switch (type) {
+ case "CURRENT_ROW":
+ return RexWindowBounds.create(
+ SqlWindow.createCurrentRow(SqlParserPos.ZERO), null);
+ case "UNBOUNDED_PRECEDING":
+ return RexWindowBounds.create(
+ SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO), null);
+ case "UNBOUNDED_FOLLOWING":
+ return RexWindowBounds.create(
+ SqlWindow.createUnboundedFollowing(SqlParserPos.ZERO), null);
+ case "PRECEDING":
+ RexNode precedingOffset = toRex(input, map.get("offset"));
+ return RexWindowBounds.create(null,
+ input.getCluster().getRexBuilder().makeCall(
+ SqlWindow.PRECEDING_OPERATOR, precedingOffset));
+ case "FOLLOWING":
+ RexNode followingOffset = toRex(input, map.get("offset"));
+ return RexWindowBounds.create(null,
+ input.getCluster().getRexBuilder().makeCall(
+ SqlWindow.FOLLOWING_OPERATOR, followingOffset));
+ default:
+ throw new UnsupportedOperationException("cannot convert type to rex window bound " + type);
+ }
+ }
+
+ /** */
+ private List<RexNode> toRexList(RelInput relInput, List<?> operands) {
+ List<RexNode> list = new ArrayList<>();
+ for (Object operand : operands)
+ list.add(toRex(relInput, operand));
+ return list;
+ }
+
+ /** */
+ private Object toJson(Enum<?> enum0) {
+ String key = enum0.getDeclaringClass().getSimpleName() + "#" + enum0.name();
+
+ if (ENUM_BY_NAME.get(key) == enum0)
+ return key;
+
+ Map<String, Object> map = map();
+ map.put("class", enum0.getDeclaringClass().getName());
+ map.put("name", enum0.name());
+ return map;
+ }
+
+ /** */
+ private Object toJson(AggregateCall node) {
+ Map<String, Object> map = map();
+ map.put("agg", toJson(node.getAggregation()));
+ map.put("type", toJson(node.getType()));
+ map.put("distinct", node.isDistinct());
+ map.put("operands", node.getArgList());
+ map.put("filter", node.filterArg);
+ map.put("name", node.getName());
+ return map;
+ }
+
+ /** */
+ private Object toJson(RelDataType node) {
+ if (node instanceof JavaType) {
+ Map<String, Object> map = map();
+ map.put("class", ((JavaType)node).getJavaClass().getName());
+ if (node.isNullable())
+ map.put("nullable", true);
+
+ return map;
+ }
+ if (node.isStruct()) {
+ List<Object> list = list();
+ for (RelDataTypeField field : node.getFieldList())
+ list.add(toJson(field));
+ return list;
+ }
+ else if (node.getSqlTypeName() == SqlTypeName.ARRAY) {
+ Map<String, Object> map = map();
+ map.put("type", toJson(node.getSqlTypeName()));
+ map.put("elementType", toJson(node.getComponentType()));
+ return map;
+ }
+ else {
+ Map<String, Object> map = map();
+ map.put("type", toJson(node.getSqlTypeName()));
+ if (node.isNullable())
+ map.put("nullable", true);
+ if (node.getSqlTypeName().allowsPrec())
+ map.put("precision", node.getPrecision());
+ if (node.getSqlTypeName().allowsScale())
+ map.put("scale", node.getScale());
+ return map;
+ }
+ }
+
+ /** */
+ private Object toJson(RelDataTypeField node) {
+ Map<String, Object> map;
+ if (node.getType().isStruct()) {
+ map = map();
+ map.put("fields", toJson(node.getType()));
+ }
+ else
+ map = (Map<String, Object>)toJson(node.getType());
+ map.put("name", node.getName());
+ return map;
+ }
+
+ /** */
+ private Object toJson(CorrelationId node) {
+ return node.getId();
+ }
+
+ /** */
+ private Object toJson(RexNode node) {
+ // removes calls to SEARCH and the included Sarg and converts them to comparisons
+ node = RexUtil.expandSearch(cluster.getRexBuilder(), null, node);
+
+ Map<String, Object> map;
+ switch (node.getKind()) {
+ case FIELD_ACCESS:
+ map = map();
+ RexFieldAccess fieldAccess = (RexFieldAccess)node;
+ map.put("field", fieldAccess.getField().getName());
+ map.put("expr", toJson(fieldAccess.getReferenceExpr()));
+
+ return map;
+ case LITERAL:
+ RexLiteral literal = (RexLiteral)node;
+ Object value = literal.getValue3();
+ map = map();
+ map.put("literal", toJson(value));
+ map.put("type", toJson(node.getType()));
+
+ return map;
+ case INPUT_REF:
+ map = map();
+ map.put("input", ((RexSlot)node).getIndex());
+ map.put("name", ((RexVariable)node).getName());
+
+ return map;
+ case DYNAMIC_PARAM:
+ map = map();
+ map.put("input", ((RexDynamicParam)node).getIndex());
+ map.put("name", ((RexVariable)node).getName());
+ map.put("type", toJson(node.getType()));
+ map.put("dynamic", true);
+
+ return map;
+ case LOCAL_REF:
+ map = map();
+ map.put("input", ((RexSlot)node).getIndex());
+ map.put("name", ((RexVariable)node).getName());
+ map.put("type", toJson(node.getType()));
+
+ return map;
+ case CORREL_VARIABLE:
+ map = map();
+ map.put("correl", ((RexVariable)node).getName());
+ map.put("type", toJson(node.getType()));
+
+ return map;
+ default:
+ if (node instanceof RexCall) {
+ RexCall call = (RexCall)node;
+ map = map();
+ map.put("op", toJson(call.getOperator()));
+ List<Object> list = list();
+
+ for (RexNode operand : call.getOperands())
+ list.add(toJson(operand));
+
+ map.put("operands", list);
+
+ if (node.getKind() == SqlKind.CAST)
+ map.put("type", toJson(node.getType()));
+
+ if (call.getOperator() instanceof SqlFunction)
+ if (((SqlFunction)call.getOperator()).getFunctionType().isUserDefined()) {
+ SqlOperator op = call.getOperator();
+ map.put("class", op.getClass().getName());
+ map.put("type", toJson(node.getType()));
+ map.put("deterministic", op.isDeterministic());
+ map.put("dynamic", op.isDynamicFunction());
+ }
+
+ if (call instanceof RexOver) {
+ RexOver over = (RexOver)call;
+ map.put("distinct", over.isDistinct());
+ map.put("type", toJson(node.getType()));
+ map.put("window", toJson(over.getWindow()));
+ }
+
+ return map;
+ }
+ throw new UnsupportedOperationException("unknown rex " + node);
+ }
+ }
+
+ /** */
+ private Object toJson(RexWindow window) {
+ Map<String, Object> map = map();
+ if (!window.partitionKeys.isEmpty())
+ map.put("partition", toJson(window.partitionKeys));
+ if (!window.orderKeys.isEmpty())
+ map.put("order", toJson(window.orderKeys));
+ if (window.getLowerBound() == null) {
+ // No ROWS or RANGE clause
+ }
+ else if (window.getUpperBound() == null)
+ if (window.isRows())
+ map.put("rows-lower", toJson(window.getLowerBound()));
+ else
+ map.put("range-lower", toJson(window.getLowerBound()));
+ else if (window.isRows()) {
+ map.put("rows-lower", toJson(window.getLowerBound()));
+ map.put("rows-upper", toJson(window.getUpperBound()));
+ }
+ else {
+ map.put("range-lower", toJson(window.getLowerBound()));
+ map.put("range-upper", toJson(window.getUpperBound()));
+ }
+ return map;
+ }
+
+ /** */
+ private Object toJson(DistributionTrait distribution) {
+ Type type = distribution.getType();
+
+ switch (type) {
+ case ANY:
+ case BROADCAST_DISTRIBUTED:
+ case RANDOM_DISTRIBUTED:
+ case SINGLETON:
+
+ return type.shortName;
+ case HASH_DISTRIBUTED:
+ Map<String, Object> map = map();
+ List<Object> keys = list();
+ for (Integer key : distribution.getKeys())
+ keys.add(toJson(key));
+
+ map.put("keys", keys);
+
+ DistributionFunction function = distribution.function();
+
+ if (function.affinity())
+ map.put("cacheId", function.cacheId());
+
+ return map;
+ default:
+ throw new AssertionError("Unexpected distribution type.");
+ }
+ }
+
+ /** */
+ private Object toJson(RelCollationImpl node) {
+ List<Object> list = list();
+ for (RelFieldCollation fieldCollation : node.getFieldCollations()) {
+ Map<String, Object> map = map();
+ map.put("field", fieldCollation.getFieldIndex());
+ map.put("direction", toJson(fieldCollation.getDirection()));
+ map.put("nulls", toJson(fieldCollation.nullDirection));
+ list.add(map);
+ }
+ return list;
+ }
+
+ /** */
+ private Object toJson(RexFieldCollation collation) {
+ Map<String, Object> map = map();
+ map.put("expr", toJson(collation.left));
+ map.put("direction", toJson(collation.getDirection()));
+ map.put("null-direction", toJson(collation.getNullDirection()));
+ return map;
+ }
+
+ /** */
+ private Object toJson(RexWindowBound windowBound) {
+ Map<String, Object> map = map();
+ if (windowBound.isCurrentRow())
+ map.put("type", "CURRENT_ROW");
+ else if (windowBound.isUnbounded())
+ map.put("type", windowBound.isPreceding() ? "UNBOUNDED_PRECEDING" : "UNBOUNDED_FOLLOWING");
+ else {
+ map.put("type", windowBound.isPreceding() ? "PRECEDING" : "FOLLOWING");
+ map.put("offset", toJson(windowBound.getOffset()));
+ }
+ return map;
+ }
+
+ /** */
+ private Object toJson(SqlOperator operator) {
+ // User-defined operators are not yet handled.
+ Map map = map();
+ map.put("name", operator.getName());
+ map.put("kind", toJson(operator.kind));
+ map.put("syntax", toJson(operator.getSyntax()));
+ return map;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
new file mode 100644
index 0000000..0d0a9a3
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonReader.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteException;
+
+/** */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class RelJsonReader {
+ /** */
+ private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF = new TypeReference<>() {};
+
+ /** */
+ private final ObjectMapper mapper = new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+
+ /** */
+ private final RelOptCluster cluster;
+
+ /** */
+ private final RelOptSchema relOptSchema;
+
+ /** */
+ private final RelJson relJson;
+
+ /** */
+ private final Map<String, RelNode> relMap = new LinkedHashMap<>();
+
+ /** */
+ private RelNode lastRel;
+
+ /** */
+ public static <T extends RelNode> T fromJson(PlanningContext ctx, String json) {
+ RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+
+ return (T)reader.read(json);
+ }
+
+ /** */
+ public RelJsonReader(RelOptCluster cluster, RelOptSchema relOptSchema) {
+ this.cluster = cluster;
+ this.relOptSchema = relOptSchema;
+
+ relJson = new RelJson(cluster);
+ }
+
+ /** */
+ public RelNode read(String s) {
+ try {
+ lastRel = null;
+ Map<String, Object> o = mapper.readValue(s, TYPE_REF);
+ List<Map<String, Object>> rels = (List)o.get("rels");
+ readRels(rels);
+ return lastRel;
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private void readRels(List<Map<String, Object>> jsonRels) {
+ for (Map<String, Object> jsonRel : jsonRels)
+ readRel(jsonRel);
+ }
+
+ /** */
+ private void readRel(Map<String, Object> jsonRel) {
+ String id = (String)jsonRel.get("id");
+ String type = (String)jsonRel.get("relOp");
+ Function<RelInput, RelNode> factory = relJson.factory(type);
+ RelNode rel = factory.apply(new RelInputImpl(jsonRel));
+ relMap.put(id, rel);
+ lastRel = rel;
+ }
+
+ /** */
+ private class RelInputImpl implements RelInputEx {
+ /** */
+ private final Map<String, Object> jsonRel;
+
+ /** */
+ private RelInputImpl(Map<String, Object> jsonRel) {
+ this.jsonRel = jsonRel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptCluster getCluster() {
+ return cluster;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelTraitSet getTraitSet() {
+ return cluster.traitSet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelOptTable getTable(String table) {
+ List<String> list = getStringList(table);
+ return relOptSchema.getTableForMember(list);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelNode getInput() {
+ List<RelNode> inputs = getInputs();
+ assert inputs.size() == 1;
+ return inputs.get(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<RelNode> getInputs() {
+ List<String> jsonInputs = getStringList("inputs");
+ if (jsonInputs == null)
+ return ImmutableList.of(lastRel);
+ List<RelNode> inputs = new ArrayList<>();
+ for (String jsonInput : jsonInputs)
+ inputs.add(lookupInput(jsonInput));
+ return inputs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RexNode getExpression(String tag) {
+ return relJson.toRex(this, jsonRel.get(tag));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ImmutableBitSet getBitSet(String tag) {
+ return ImmutableBitSet.of(getIntegerList(tag));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<ImmutableBitSet> getBitSetList(String tag) {
+ List<List<Integer>> list = getIntegerListList(tag);
+ if (list == null)
+ return null;
+ ImmutableList.Builder<ImmutableBitSet> builder =
+ ImmutableList.builder();
+ for (List<Integer> integers : list)
+ builder.add(ImmutableBitSet.of(integers));
+ return builder.build();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<String> getStringList(String tag) {
+ return (List<String>)jsonRel.get(tag);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Integer> getIntegerList(String tag) {
+ return (List<Integer>)jsonRel.get(tag);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<Integer>> getIntegerListList(String tag) {
+ return (List<List<Integer>>)jsonRel.get(tag);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<AggregateCall> getAggregateCalls(String tag) {
+ List<Map<String, Object>> jsonAggs = (List)jsonRel.get(tag);
+ List<AggregateCall> inputs = new ArrayList<>();
+ for (Map<String, Object> jsonAggCall : jsonAggs)
+ inputs.add(toAggCall(jsonAggCall));
+ return inputs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object get(String tag) {
+ return jsonRel.get(tag);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getString(String tag) {
+ return (String)jsonRel.get(tag);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getFloat(String tag) {
+ return ((Number)jsonRel.get(tag)).floatValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getBoolean(String tag, boolean default_) {
+ Boolean b = (Boolean)jsonRel.get(tag);
+ return b != null ? b : default_;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <E extends Enum<E>> E getEnum(String tag, Class<E> enumClass) {
+ return Util.enumVal(enumClass,
+ getString(tag).toUpperCase(Locale.ROOT));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<RexNode> getExpressionList(String tag) {
+ List<Object> jsonNodes = (List)jsonRel.get(tag);
+ List<RexNode> nodes = new ArrayList<>();
+ for (Object jsonNode : jsonNodes)
+ nodes.add(relJson.toRex(this, jsonNode));
+ return nodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType getRowType(String tag) {
+ Object o = jsonRel.get(tag);
+ return relJson.toType(Commons.typeFactory(cluster), o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType getRowType(String expressionsTag, String fieldsTag) {
+ List<RexNode> expressionList = getExpressionList(expressionsTag);
+ List<String> names =
+ (List<String>)get(fieldsTag);
+ return Commons.typeFactory(cluster).createStructType(
+ new AbstractList<Map.Entry<String, RelDataType>>() {
+ @Override public Map.Entry<String, RelDataType> get(int index) {
+ return Pair.of(names.get(index),
+ expressionList.get(index).getType());
+ }
+
+ @Override public int size() {
+ return names.size();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation getCollation() {
+ return relJson.toCollation((List)get("collation"));
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation getCollation(String tag) {
+ return relJson.toCollation((List)get(tag));
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDistribution getDistribution() {
+ return relJson.toDistribution(get("distribution"));
+ }
+
+ /** {@inheritDoc} */
+ @Override public ImmutableList<ImmutableList<RexLiteral>> getTuples(String tag) {
+ List<List> jsonTuples = (List)get(tag);
+ ImmutableList.Builder<ImmutableList<RexLiteral>> builder =
+ ImmutableList.builder();
+ for (List jsonTuple : jsonTuples)
+ builder.add(getTuple(jsonTuple));
+ return builder.build();
+ }
+
+ /** */
+ private RelNode lookupInput(String jsonInput) {
+ RelNode node = relMap.get(jsonInput);
+ if (node == null)
+ throw new RuntimeException("unknown id " + jsonInput
+ + " for relational expression");
+ return node;
+ }
+
+ /** */
+ private ImmutableList<RexLiteral> getTuple(List jsonTuple) {
+ ImmutableList.Builder<RexLiteral> builder =
+ ImmutableList.builder();
+ for (Object jsonValue : jsonTuple)
+ builder.add((RexLiteral)relJson.toRex(this, jsonValue));
+ return builder.build();
+ }
+
+ /** */
+ private AggregateCall toAggCall(Map<String, Object> jsonAggCall) {
+ Map<String, Object> aggMap = (Map)jsonAggCall.get("agg");
+ SqlAggFunction aggregation = (SqlAggFunction)relJson.toOp(aggMap);
+ Boolean distinct = (Boolean)jsonAggCall.get("distinct");
+ List<Integer> operands = (List<Integer>)jsonAggCall.get("operands");
+ Integer filterOperand = (Integer)jsonAggCall.get("filter");
+ RelDataType type = relJson.toType(Commons.typeFactory(cluster), jsonAggCall.get("type"));
+ String name = (String)jsonAggCall.get("name");
+ return AggregateCall.create(aggregation, distinct, false, false, operands,
+ filterOperand == null ? -1 : filterOperand,
+ RelCollations.EMPTY,
+ type, name);
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
new file mode 100644
index 0000000..b96ec8d
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJsonWriter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.externalize;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Callback for a relational expression to dump itself as JSON.
+ *
+ * @see RelJsonReader
+ */
+public class RelJsonWriter implements RelWriter {
+ /** */
+ private static final boolean PRETTY_PRINT = false;
+ // TODO: IgniteSystemProperties.getBoolean("IGNITE_CALCITE_REL_JSON_PRETTY_PRINT", false);
+
+ /** */
+ private final RelJson relJson;
+
+ /** */
+ private final List<Object> relList = new ArrayList<>();
+
+ /** */
+ private final Map<RelNode, String> relIdMap = new IdentityHashMap<>();
+
+ /** */
+ private final boolean pretty;
+
+ /** */
+ private String previousId;
+
+ /** */
+ private List<Pair<String, Object>> items = new ArrayList<>();
+
+ /** */
+ public static String toJson(RelNode rel) {
+ RelJsonWriter writer = new RelJsonWriter(rel.getCluster(), PRETTY_PRINT);
+ rel.explain(writer);
+
+ return writer.asString();
+ }
+
+ /** */
+ public RelJsonWriter(RelOptCluster cluster, boolean pretty) {
+ this.pretty = pretty;
+
+ relJson = new RelJson(cluster);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void explain(RelNode rel, List<Pair<String, Object>> valueList) {
+ explain_(rel, valueList);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlExplainLevel getDetailLevel() {
+ return SqlExplainLevel.ALL_ATTRIBUTES;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter item(String term, Object value) {
+ items.add(Pair.of(term, value));
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter done(RelNode node) {
+ List<Pair<String, Object>> current0 = items;
+ items = new ArrayList<>();
+ explain_(node, current0);
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean nest() {
+ return true;
+ }
+
+ /** */
+ public String asString() {
+ try {
+ StringWriter writer = new StringWriter();
+ ObjectMapper mapper = new ObjectMapper();
+
+ ObjectWriter writer0 = pretty
+ ? mapper.writer(new DefaultPrettyPrinter())
+ : mapper.writer();
+
+ writer0
+ .withRootName("rels")
+ .writeValue(writer, relList);
+
+ return writer.toString();
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private void explain_(RelNode rel, List<Pair<String, Object>> values) {
+ final Map<String, Object> map = relJson.map();
+
+ map.put("id", null); // ensure that id is the first attribute
+ map.put("relOp", relJson.classToTypeName(rel.getClass()));
+
+ for (Pair<String, Object> value : values) {
+ if (value.right instanceof RelNode)
+ continue;
+
+ map.put(value.left, relJson.toJson(value.right));
+ }
+ // omit 'inputs: ["3"]' if "3" is the preceding rel
+ final List<Object> list = explainInputs(rel.getInputs());
+ if (list.size() != 1 || !list.get(0).equals(previousId))
+ map.put("inputs", list);
+
+ final String id = Integer.toString(relIdMap.size());
+ relIdMap.put(rel, id);
+ map.put("id", id);
+
+ relList.add(map);
+ previousId = id;
+ }
+
+ /** */
+ private List<Object> explainInputs(List<RelNode> inputs) {
+ final List<Object> list = relJson.list();
+ for (RelNode input : inputs) {
+ String id = relIdMap.get(input);
+ if (id == null) {
+ input.explain(this);
+ id = previousId;
+ }
+ list.add(id);
+ }
+ return list;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
new file mode 100644
index 0000000..fa6b1f8
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.IgniteIntList;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+/** */
+public class ColocationGroup {
+ /** */
+ private static final int SYNTHETIC_PARTITIONS_COUNT = 512;
+ // TODO: IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT", 512);
+
+ /** */
+ private List<Long> sourceIds;
+
+ /** */
+ private List<UUID> nodeIds;
+
+ /** */
+ private List<List<UUID>> assignments;
+
+ /** */
+ public static ColocationGroup forNodes(List<UUID> nodeIds) {
+ return new ColocationGroup(null, nodeIds, null);
+ }
+
+ /** */
+ public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
+ return new ColocationGroup(null, null, assignments);
+ }
+
+ /** */
+ public static ColocationGroup forSourceId(long sourceId) {
+ return new ColocationGroup(Collections.singletonList(sourceId), null, null);
+ }
+
+ /** */
+ private ColocationGroup(List<Long> sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments) {
+ this.sourceIds = sourceIds;
+ this.nodeIds = nodeIds;
+ this.assignments = assignments;
+ }
+
+ /**
+ * @return Lists of colocation group sources.
+ */
+ public List<Long> sourceIds() {
+ return sourceIds == null ? Collections.emptyList() : sourceIds;
+ }
+
+ /**
+ * @return Lists of nodes capable to execute a query fragment for what the mapping is calculated.
+ */
+ public List<UUID> nodeIds() {
+ return nodeIds == null ? Collections.emptyList() : nodeIds;
+ }
+
+ /**
+ * @return List of partitions (index) and nodes (items) having an appropriate partition in
+ * OWNING state, calculated for distributed tables, involved in query execution.
+ */
+ public List<List<UUID>> assignments() {
+ return assignments == null ? Collections.emptyList() : assignments;
+ }
+
+ /**
+ * Prunes involved partitions (hence nodes, involved in query execution) on the basis of filter,
+ * its distribution, query parameters and original nodes mapping.
+ * @param rel Filter.
+ * @return Resulting nodes mapping.
+ */
+ public ColocationGroup prune(IgniteRel rel) {
+ return this; // TODO https://issues.apache.org/jira/browse/IGNITE-12455
+ }
+
+ /** */
+ public boolean belongs(long sourceId) {
+ return sourceIds != null && sourceIds.contains(sourceId);
+ }
+
+ /**
+ * Merges this mapping with given one.
+ * @param other Mapping to merge with.
+ * @return Merged nodes mapping.
+ * @throws ColocationMappingException If involved nodes intersection is empty, hence there is no nodes capable to execute
+ * being calculated fragment.
+ */
+ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException {
+ List<Long> sourceIds;
+ if (this.sourceIds == null || other.sourceIds == null)
+ sourceIds = firstNotNull(this.sourceIds, other.sourceIds);
+ else
+ sourceIds = Commons.combine(this.sourceIds, other.sourceIds);
+
+ List<UUID> nodeIds;
+ if (this.nodeIds == null || other.nodeIds == null)
+ nodeIds = firstNotNull(this.nodeIds, other.nodeIds);
+ else
+ nodeIds = Commons.intersect(other.nodeIds, this.nodeIds);
+
+ if (nodeIds != null && nodeIds.isEmpty()) {
+ throw new ColocationMappingException("Failed to map fragment to location. " +
+ "Replicated query parts are not co-located on all nodes");
+ }
+
+ List<List<UUID>> assignments;
+ if (this.assignments == null || other.assignments == null) {
+ assignments = firstNotNull(this.assignments, other.assignments);
+
+ if (assignments != null && nodeIds != null) {
+ Set<UUID> filter = new HashSet<>(nodeIds);
+ List<List<UUID>> assignments0 = new ArrayList<>(assignments.size());
+
+ for (int i = 0; i < assignments.size(); i++) {
+ List<UUID> assignment = Commons.intersect(filter, assignments.get(i));
+
+ if (assignment.isEmpty()) { // TODO check with partition filters
+ throw new ColocationMappingException("Failed to map fragment to location. " +
+ "Partition mapping is empty [part=" + i + "]");
+ }
+
+ assignments0.add(assignment);
+ }
+
+ assignments = assignments0;
+ }
+ }
+ else {
+ assert this.assignments.size() == other.assignments.size();
+ assignments = new ArrayList<>(this.assignments.size());
+ Set<UUID> filter = nodeIds == null ? null : new HashSet<>(nodeIds);
+ for (int i = 0; i < this.assignments.size(); i++) {
+ List<UUID> assignment = Commons.intersect(this.assignments.get(i), other.assignments.get(i));
+
+ if (filter != null)
+ assignment.retainAll(filter);
+
+ if (assignment.isEmpty()) // TODO check with partition filters
+ throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
+
+ assignments.add(assignment);
+ }
+ }
+
+ return new ColocationGroup(sourceIds, nodeIds, assignments);
+ }
+
+ /** */
+ public ColocationGroup finalaze() {
+ if (assignments == null && nodeIds == null)
+ return this;
+
+ if (assignments != null) {
+ List<List<UUID>> assignments = new ArrayList<>(this.assignments.size());
+ Set<UUID> nodes = new HashSet<>();
+ for (List<UUID> assignment : this.assignments) {
+ UUID first = first(assignment);
+ if (first != null)
+ nodes.add(first);
+ assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
+ }
+
+ return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
+ }
+
+ return forNodes0(nodeIds);
+ }
+
+ /** */
+ public ColocationGroup mapToNodes(List<UUID> nodeIds) {
+ return !nullOrEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
+ }
+
+ /** */
+ @NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
+ List<List<UUID>> assignments = new ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
+ for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++)
+ assignments.add(asList(nodeIds.get(i % nodeIds.size())));
+ return new ColocationGroup(sourceIds, nodeIds, assignments);
+ }
+
+ /**
+ * Returns List of partitions to scan on the given node.
+ *
+ * @param nodeId Cluster node ID.
+ * @return List of partitions to scan on the given node.
+ */
+ public int[] partitions(UUID nodeId) {
+ IgniteIntList parts = new IgniteIntList(assignments.size());
+
+ for (int i = 0; i < assignments.size(); i++) {
+ List<UUID> assignment = assignments.get(i);
+ if (Objects.equals(nodeId, first(assignment)))
+ parts.add(i);
+ }
+
+ return parts.array();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
index 01746a7..a1be2b7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationMappingException.java
@@ -14,22 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
-
- /** */
- public static boolean boolFoo(Object... args) {
- return args == null;
- }
+package org.apache.ignite.internal.processors.query.calcite.metadata;
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
+/**
+ *
+ */
+public class ColocationMappingException extends Exception {
+ /**
+ * @param message Message.
+ */
+ public ColocationMappingException(String message) {
+ super(message);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
new file mode 100644
index 0000000..5d8f2ad
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+/**
+ *
+ */
+public class FragmentMapping {
+ /** */
+ private List<ColocationGroup> colocationGroups;
+
+ /** */
+ public FragmentMapping() {
+ }
+
+ /** */
+ private FragmentMapping(ColocationGroup colocationGroup) {
+ this(asList(colocationGroup));
+ }
+
+ /** */
+ private FragmentMapping(List<ColocationGroup> colocationGroups) {
+ this.colocationGroups = colocationGroups;
+ }
+
+ /** */
+ public static FragmentMapping create() {
+ return new FragmentMapping(Collections.emptyList());
+ }
+
+ /** */
+ public static FragmentMapping create(UUID nodeId) {
+ return new FragmentMapping(ColocationGroup.forNodes(Collections.singletonList(nodeId)));
+ }
+
+ /** */
+ public static FragmentMapping create(long sourceId) {
+ return new FragmentMapping(ColocationGroup.forSourceId(sourceId));
+ }
+
+ /** */
+ public static FragmentMapping create(long sourceId, ColocationGroup group) {
+ try {
+ return new FragmentMapping(ColocationGroup.forSourceId(sourceId).colocate(group));
+ }
+ catch (ColocationMappingException e) {
+ throw new AssertionError(e); // Cannot happen
+ }
+ }
+
+ /** */
+ public boolean colocated() {
+ return colocationGroups.isEmpty() || colocationGroups.size() == 1;
+ }
+
+ /** */
+ public FragmentMapping prune(IgniteRel rel) {
+ if (colocationGroups.size() != 1)
+ return this;
+
+ return new FragmentMapping(first(colocationGroups).prune(rel));
+ }
+
+ /** */
+ public FragmentMapping combine(FragmentMapping other) {
+ return new FragmentMapping(Commons.combine(colocationGroups, other.colocationGroups));
+ }
+
+ /** */
+ public FragmentMapping colocate(FragmentMapping other) throws ColocationMappingException {
+ assert colocated() && other.colocated();
+
+ ColocationGroup first = first(colocationGroups);
+ ColocationGroup second = first(other.colocationGroups);
+
+ if (first == null && second == null)
+ return this;
+ else if (first == null || second == null)
+ return new FragmentMapping(firstNotNull(first, second));
+ else
+ return new FragmentMapping(first.colocate(second));
+ }
+
+ /** */
+ public List<UUID> nodeIds() {
+ return colocationGroups.stream()
+ .flatMap(g -> g.nodeIds().stream())
+ .distinct().collect(Collectors.toList());
+ }
+
+ /** */
+ public FragmentMapping finalize(Supplier<List<UUID>> nodesSource) {
+ if (colocationGroups.isEmpty())
+ return this;
+
+ List<ColocationGroup> colocationGroups = this.colocationGroups;
+
+ colocationGroups = Commons.transform(colocationGroups, ColocationGroup::finalaze);
+ List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? nodesSource.get() : nodes;
+ colocationGroups = Commons.transform(colocationGroups, g -> g.mapToNodes(nodes0));
+
+ return new FragmentMapping(colocationGroups);
+ }
+
+ /** */
+ public @NotNull ColocationGroup findGroup(long sourceId) {
+ List<ColocationGroup> groups = colocationGroups.stream()
+ .filter(c -> c.belongs(sourceId))
+ .collect(Collectors.toList());
+
+ if (groups.isEmpty())
+ throw new IllegalStateException("Failed to find group with given id. [sourceId=" + sourceId + "]");
+ else if (groups.size() > 1)
+ throw new IllegalStateException("Multiple groups with the same id found. [sourceId=" + sourceId + "]");
+
+ return first(groups);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java
new file mode 100644
index 0000000..19a0d75
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMappingException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+
+/**
+ *
+ */
+public class FragmentMappingException extends RuntimeException {
+ /** */
+ private final Fragment fragment;
+
+ /** */
+ private final RelNode node;
+
+ /**
+ *
+ * @param message Message.
+ * @param node Node of a query plan, where the exception was thrown.
+ * @param cause Cause.
+ */
+ public FragmentMappingException(String message, Fragment fragment, RelNode node, Throwable cause) {
+ super(message, cause);
+ this.fragment = fragment;
+ this.node = node;
+ }
+
+ /**
+ * @return Fragment of a query plan, where the exception was thrown.
+ */
+ public Fragment fragment() {
+ return fragment;
+ }
+
+ /**
+ * @return Node of a query plan, where the exception was thrown.
+ */
+ public RelNode node() {
+ return node;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
new file mode 100644
index 0000000..e16ce58
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
+
+/**
+ * Implementation class for {@link RelMetadataQueryEx#fragmentMapping(RelNode)} method call.
+ */
+public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingMetadata> {
+ /**
+ * Metadata provider, responsible for nodes mapping request. It uses this implementation class under the hood.
+ */
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ IgniteMethod.FRAGMENT_MAPPING.method(), new IgniteMdFragmentMapping());
+
+ /** {@inheritDoc} */
+ @Override public MetadataDef<FragmentMappingMetadata> getDef() {
+ return FragmentMappingMetadata.DEF;
+ }
+
+ /**
+ * Requests meta information about nodes capable to execute a query over particular partitions.
+ *
+ * @param rel Relational node.
+ * @param mq Metadata query instance. Used to request appropriate metadata from node children.
+ * @return Nodes mapping, representing a list of nodes capable to execute a query over particular partitions.
+ */
+ public FragmentMapping fragmentMapping(RelNode rel, RelMetadataQuery mq) {
+ throw new AssertionError();
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(RelSubset rel, RelMetadataQuery mq) {
+ throw new AssertionError();
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(SingleRel rel, RelMetadataQuery mq) {
+ return _fragmentMapping(rel.getInput(), mq);
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ *
+ * {@link ColocationMappingException} may be thrown on two children nodes locations merge. This means
+ * that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
+ * is needed. This case we throw {@link NodeMappingException} with an edge, where we need the additional
+ * exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
+ * information will be recalculated for all fragments.
+ */
+ public FragmentMapping fragmentMapping(BiRel rel, RelMetadataQuery mq) {
+ RelNode left = rel.getLeft();
+ RelNode right = rel.getRight();
+
+ FragmentMapping fLeft = _fragmentMapping(left, mq);
+ FragmentMapping fRight = _fragmentMapping(right, mq);
+
+ try {
+ return fLeft.colocate(fRight);
+ }
+ catch (ColocationMappingException e) {
+ IgniteExchange lExch = new IgniteExchange(rel.getCluster(), left.getTraitSet(), left, TraitUtils.distribution(left));
+ IgniteExchange rExch = new IgniteExchange(rel.getCluster(), right.getTraitSet(), right, TraitUtils.distribution(right));
+
+ RelNode lVar = rel.copy(rel.getTraitSet(), ImmutableList.of(lExch, right));
+ RelNode rVar = rel.copy(rel.getTraitSet(), ImmutableList.of(left, rExch));
+
+ RelOptCost lVarCost = mq.getCumulativeCost(lVar);
+ RelOptCost rVarCost = mq.getCumulativeCost(rVar);
+
+ if (lVarCost.isLt(rVarCost))
+ throw new NodeMappingException("Failed to calculate physical distribution", left, e);
+ else
+ throw new NodeMappingException("Failed to calculate physical distribution", right, e);
+ }
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ *
+ * {@link ColocationMappingException} may be thrown on two children nodes locations merge. This means
+ * that the fragment (which part the parent node is) cannot be executed on any node and additional exchange
+ * is needed. This case we throw {@link NodeMappingException} with an edge, where we need the additional
+ * exchange. After the exchange is put into the fragment and the fragment is split into two ones, fragment meta
+ * information will be recalculated for all fragments.
+ */
+ public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq) {
+ FragmentMapping res = null;
+
+ if (TraitUtils.distribution(rel) == IgniteDistributions.random()) {
+ for (RelNode input : rel.getInputs())
+ res = res == null ? _fragmentMapping(input, mq) : res.combine(_fragmentMapping(input, mq));
+ }
+ else {
+ for (RelNode input : rel.getInputs()) {
+ try {
+ res = res == null ? _fragmentMapping(input, mq) : res.colocate(_fragmentMapping(input, mq));
+ }
+ catch (ColocationMappingException e) {
+ throw new NodeMappingException("Failed to calculate physical distribution", input, e);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ *
+ * Prunes involved partitions (hence nodes, involved in query execution) if possible.
+ */
+ public FragmentMapping fragmentMapping(IgniteFilter rel, RelMetadataQuery mq) {
+ return _fragmentMapping(rel.getInput(), mq).prune(rel);
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ *
+ * Prunes involved partitions (hence nodes, involved in query execution) if possible.
+ */
+ public FragmentMapping fragmentMapping(IgniteTrimExchange rel, RelMetadataQuery mq) {
+ try {
+ return FragmentMapping.create(rel.sourceId())
+ .colocate(_fragmentMapping(rel.getInput(), mq));
+ }
+ catch (ColocationMappingException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(IgniteReceiver rel, RelMetadataQuery mq) {
+ return FragmentMapping.create(rel.exchangeId());
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(IgniteIndexScan rel, RelMetadataQuery mq) {
+ return FragmentMapping.create(rel.sourceId(),
+ rel.getTable().unwrap(IgniteTable.class).colocationGroup(Commons.context(rel)));
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(IgniteTableScan rel, RelMetadataQuery mq) {
+ return FragmentMapping.create(rel.sourceId(),
+ rel.getTable().unwrap(IgniteTable.class).colocationGroup(Commons.context(rel)));
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(IgniteValues rel, RelMetadataQuery mq) {
+ return FragmentMapping.create();
+ }
+
+ /**
+ * See {@link IgniteMdFragmentMapping#fragmentMapping(RelNode, RelMetadataQuery)}
+ */
+ public FragmentMapping fragmentMapping(IgniteTableFunctionScan rel, RelMetadataQuery mq) {
+ return FragmentMapping.create();
+ }
+
+ /**
+ * Fragment info calculation entry point.
+ * @param rel Root node of a calculated fragment.
+ * @param mq Metadata query instance.
+ * @return Fragment meta information.
+ */
+ public static FragmentMapping _fragmentMapping(RelNode rel, RelMetadataQuery mq) {
+ assert mq instanceof RelMetadataQueryEx;
+
+ return ((RelMetadataQueryEx) mq).fragmentMapping(rel);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index 9ad0a11..1b71fc1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -18,17 +18,27 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import org.apache.calcite.rel.metadata.Metadata;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
/**
* Utility class, holding metadata related interfaces and metadata providers.
*/
public class IgniteMetadata {
+ /** */
public static final RelMetadataProvider METADATA_PROVIDER =
ChainedRelMetadataProvider.of(
ImmutableList.of(
+ // Ignite specific providers
+ IgniteMdFragmentMapping.SOURCE,
+
// Ignite overriden providers
IgniteMdDistribution.SOURCE,
IgniteMdPercentageOriginalRows.SOURCE,
@@ -42,4 +52,18 @@ public class IgniteMetadata {
// Basic providers
DefaultRelMetadataProvider.INSTANCE));
+
+ /** */
+ public interface FragmentMappingMetadata extends Metadata {
+ MetadataDef<FragmentMappingMetadata> DEF = MetadataDef.of(FragmentMappingMetadata.class,
+ FragmentMappingMetadata.Handler.class, IgniteMethod.FRAGMENT_MAPPING.method());
+
+ /** Determines how the rows are distributed. */
+ FragmentMapping fragmentMapping();
+
+ /** Handler API. */
+ interface Handler extends MetadataHandler<FragmentMappingMetadata> {
+ FragmentMapping fragmentMapping(RelNode r, RelMetadataQuery mq);
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
new file mode 100644
index 0000000..e59dbfe
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service is responsible for nodes mapping calculation.
+ */
+public interface MappingService {
+ /**
+ * Returns Nodes responsible for executing intermediate fragments (fragments without Scan leafs). Such fragments may be executed
+ * on any cluster node, actual list of nodes is chosen on the basis of adopted selection strategy (using node filter).
+ *
+ * @param topVer Topology version.
+ * @param single Flag, indicating that a fragment should execute in a single node.
+ * @param nodeFilter Node filter.
+ * @return Nodes mapping for intermediate fragments.
+ */
+ List<UUID> executionNodes(long topVer, boolean single, @Nullable Predicate<ClusterNode> nodeFilter);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
similarity index 56%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
index 01746a7..6640e30 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodeMappingException.java
@@ -14,22 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ *
+ */
+public class NodeMappingException extends RuntimeException {
/** */
- public static boolean boolFoo(Object... args) {
- return args == null;
+ private final RelNode node;
+
+ /**
+ *
+ * @param message Message.
+ * @param node Node of a query plan, where the exception was thrown.
+ * @param cause Cause.
+ */
+ public NodeMappingException(String message, RelNode node, Throwable cause) {
+ super(message, cause);
+ this.node = node;
}
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
+ /**
+ * @return Node of a query plan, where the exception was thrown.
+ */
+ public RelNode node() {
+ return node;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index d405804..1758dd3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -51,6 +51,13 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
JaninoRelMetadataProvider.DEFAULT.register(types);
}
+ /** */
+ private static final IgniteMetadata.FragmentMappingMetadata.Handler SOURCE_DISTRIBUTION_INITIAL_HANDLER =
+ initialHandler(IgniteMetadata.FragmentMappingMetadata.Handler.class);
+
+ /** */
+ private IgniteMetadata.FragmentMappingMetadata.Handler sourceDistributionHandler;
+
/**
* Factory method.
*
@@ -74,4 +81,25 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
THREAD_PROVIDERS.remove();
}
}
+
+ /** */
+ private RelMetadataQueryEx() {
+ sourceDistributionHandler = SOURCE_DISTRIBUTION_INITIAL_HANDLER;
+ }
+
+ /**
+ * Calculates data location mapping for a query fragment the given relation node is a root of.
+ *
+ * @param rel Relational node.
+ * @return Fragment meta information.
+ */
+ public FragmentMapping fragmentMapping(RelNode rel) {
+ for (;;) {
+ try {
+ return sourceDistributionHandler.fragmentMapping(rel, this);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMappingMetadata.DEF);
+ }
+ }
+ }
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
new file mode 100644
index 0000000..2bfba75
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
+
+/**
+ *
+ */
+public abstract class AbstractMultiStepPlan implements MultiStepPlan {
+ /** */
+ protected final Object fieldsMetadata;
+
+ /** */
+ protected final QueryTemplate queryTemplate;
+
+ /** */
+ protected ExecutionPlan executionPlan;
+
+ /** */
+ protected AbstractMultiStepPlan(QueryTemplate queryTemplate, Object fieldsMetadata) {
+ this.queryTemplate = queryTemplate;
+ this.fieldsMetadata = fieldsMetadata;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Fragment> fragments() {
+ return Objects.requireNonNull(executionPlan).fragments();
+ }
+
+ /** {@inheritDoc} */
+ @Override public FragmentMapping mapping(Fragment fragment) {
+ return mapping(fragment.fragmentId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColocationGroup target(Fragment fragment) {
+ if (fragment.rootFragment())
+ return null;
+
+ IgniteSender sender = (IgniteSender)fragment.root();
+ return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Long, List<UUID>> remotes(Fragment fragment) {
+ List<IgniteReceiver> remotes = fragment.remotes();
+
+ if (nullOrEmpty(remotes))
+ return null;
+
+ HashMap<Long, List<UUID>> res = newHashMap(remotes.size());
+
+ for (IgniteReceiver remote : remotes)
+ res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(PlanningContext ctx) {
+ executionPlan = queryTemplate.map(ctx);
+ }
+
+ /** */
+ private FragmentMapping mapping(long fragmentId) {
+ return Objects.requireNonNull(executionPlan).fragments().stream()
+ .filter(f -> f.fragmentId() == fragmentId)
+ .findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" +
+ "fragmentId=" + fragmentId + ", " +
+ "fragments=" + fragments() + "]"))
+ .mapping();
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
new file mode 100644
index 0000000..eda9fd9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteMapMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteReduceMinus;
+import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSingleMinus;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+
+/** */
+public class Cloner implements IgniteRelVisitor<IgniteRel> {
+ /** */
+ private final RelOptCluster cluster;
+
+ /** */
+ private ImmutableList.Builder<IgniteReceiver> remotes;
+
+ /** */
+ Cloner(RelOptCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Clones and associates a plan with a new cluster.
+ *
+ * @param src Fragment to clone.
+ * @return New plan.
+ */
+ public Fragment go(Fragment src) {
+ try {
+ remotes = ImmutableList.builder();
+
+ IgniteRel newRoot = visit(src.root());
+ ImmutableList<IgniteReceiver> remotes = this.remotes.build();
+
+ return new Fragment(src.fragmentId(), newRoot, remotes, src.serialized(), src.mapping());
+ }
+ finally {
+ remotes = null;
+ }
+ }
+
+ /** */
+ public static IgniteRel clone(IgniteRel r) {
+ Cloner c = new Cloner(r.getCluster());
+
+ return c.visit(r);
+ }
+
+ /** */
+ private IgniteReceiver collect(IgniteReceiver receiver) {
+ if (remotes != null)
+ remotes.add(receiver);
+
+ return receiver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSender rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteFilter rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTrimExchange rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteProject rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableModify rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteNestedLoopJoin rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+ visit((IgniteRel) rel.getRight())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+ visit((IgniteRel) rel.getRight())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMergeJoin rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getLeft()),
+ visit((IgniteRel) rel.getRight())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexScan rel) {
+ return rel.clone(cluster, asList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ return rel.clone(cluster, asList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteValues rel) {
+ return rel.clone(cluster, asList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteUnionAll rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSort rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableSpool rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSortedIndexSpool rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteLimit rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReceiver rel) {
+ return collect((IgniteReceiver)rel.clone(cluster, asList()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteExchange rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSingleHashAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapHashAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceHashAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSingleSortAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapSortAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceSortAggregate rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteHashIndexSpool rel) {
+ return rel.clone(cluster, asList(visit((IgniteRel) rel.getInput())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteSingleMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteMapMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReduceMinus rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 -> visit((IgniteRel) rel0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableFunctionScan rel) {
+ return rel.clone(cluster, asList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteRel rel) {
+ return rel.accept(this);
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
index 01746a7..53dc4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
@@ -14,22 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+class ExecutionPlan {
+ /** */
+ private final long ver;
+
+ /** */
+ private final ImmutableList<Fragment> fragments;
+
/** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
+ ExecutionPlan(long ver, List<Fragment> fragments) {
+ this.ver = ver;
+ this.fragments = ImmutableList.copyOf(fragments);
}
/** */
- public static boolean boolFoo(Object... args) {
- return args == null;
+ public long topologyVersion() {
+ return ver;
}
/** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
+ public List<Fragment> fragments() {
+ return fragments;
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
new file mode 100644
index 0000000..aaba73c
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodeMappingException;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+
+/**
+ * Fragment of distributed query
+ */
+public class Fragment {
+ /** */
+ private final long id;
+
+ /** */
+ private final IgniteRel root;
+
+ /** Serialized root representation. */
+ @IgniteToStringExclude
+ private final String rootSer;
+
+ /** */
+ private final FragmentMapping mapping;
+
+ /** */
+ private final ImmutableList<IgniteReceiver> remotes;
+
+ /**
+ * @param id Fragment id.
+ * @param root Root node of the fragment.
+ * @param remotes Remote sources of the fragment.
+ */
+ public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes) {
+ this(id, root, remotes, null, null);
+ }
+
+ /** */
+ Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
+ this.id = id;
+ this.root = root;
+ this.remotes = ImmutableList.copyOf(remotes);
+ this.rootSer = rootSer != null ? rootSer : toJson(root);
+ this.mapping = mapping;
+ }
+
+ /**
+ * @return Fragment ID.
+ */
+ public long fragmentId() {
+ return id;
+ }
+
+ /**
+ * @return Root node.
+ */
+ public IgniteRel root() {
+ return root;
+ }
+
+ /**
+ * Lazy serialized root representation.
+ *
+ * @return Serialized form.
+ */
+ public String serialized() {
+ return rootSer;
+ }
+
+ /** */
+ public FragmentMapping mapping() {
+ return mapping;
+ }
+
+ /**
+ * @return Fragment remote sources.
+ */
+ public List<IgniteReceiver> remotes() {
+ return remotes;
+ }
+
+ /** */
+ public boolean rootFragment() {
+ return !(root instanceof IgniteSender);
+ }
+
+ /** */
+ public Fragment attach(PlanningContext ctx) {
+ RelOptCluster cluster = ctx.cluster();
+
+ return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
+ }
+
+ /** */
+ public Fragment detach() {
+ RelOptCluster cluster = PlanningContext.empty().cluster();
+
+ return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
+ }
+
+ /**
+ * Mapps the fragment to its data location.
+ * @param ctx Planner context.
+ * @param mq Metadata query.
+ */
+ Fragment map(MappingService mappingSrvc, PlanningContext ctx, RelMetadataQuery mq) throws FragmentMappingException {
+ assert root.getCluster() == ctx.cluster() : "Fragment is detached [fragment=" + this + "]";
+
+ if (mapping != null)
+ return this;
+
+ return new Fragment(id, root, remotes, rootSer, mapping(ctx, mq, nodesSource(mappingSrvc, ctx)));
+ }
+
+ /** */
+ private FragmentMapping mapping(PlanningContext ctx, RelMetadataQuery mq, Supplier<List<UUID>> nodesSource) {
+ try {
+ FragmentMapping mapping = IgniteMdFragmentMapping._fragmentMapping(root, mq);
+
+ if (rootFragment())
+ mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+
+ if (single() && mapping.nodeIds().size() > 1) {
+ // this is possible when the fragment contains scan of a replicated cache, which brings
+ // several nodes (actually all containing nodes) to the colocation group, but this fragment
+ // supposed to be executed on a single node, so let's choose one wisely
+ mapping = FragmentMapping.create(mapping.nodeIds()
+ .get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
+ }
+
+ return mapping.finalize(nodesSource);
+ }
+ catch (NodeMappingException e) {
+ throw new FragmentMappingException("Failed to calculate physical distribution", this, e.node(), e);
+ }
+ catch (ColocationMappingException e) {
+ throw new FragmentMappingException("Failed to calculate physical distribution", this, root, e);
+ }
+ }
+
+ /** */
+ @NotNull private Supplier<List<UUID>> nodesSource(MappingService mappingSrvc, PlanningContext ctx) {
+ return () -> mappingSrvc.executionNodes(ctx.topologyVersion(), single(), null);
+ }
+
+ /** */
+ private boolean single() {
+ return root instanceof IgniteSender
+ && ((IgniteSender)root).sourceDistribution().satisfies(IgniteDistributions.single());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Fragment.class, this, "root", RelOptUtil.toString(root));
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
new file mode 100644
index 0000000..a9d31ca
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ *
+ */
+public class FragmentSplitter extends IgniteRelShuttle {
+ /** */
+ private final Deque<FragmentProto> stack = new LinkedList<>();
+
+ /** */
+ private RelNode cutPoint;
+
+ /** */
+ private FragmentProto curr;
+
+ /** */
+ public FragmentSplitter(RelNode cutPoint) {
+ this.cutPoint = cutPoint;
+ }
+
+ /** */
+ public List<Fragment> go(Fragment fragment) {
+ ArrayList<Fragment> res = new ArrayList<>();
+
+ stack.push(new FragmentProto(IdGenerator.nextId(), fragment.root()));
+
+ while (!stack.isEmpty()) {
+ curr = stack.pop();
+ curr.root = visit(curr.root);
+ res.add(curr.build());
+ curr = null;
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReceiver rel) {
+ curr.remotes.add(rel);
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteExchange rel) {
+ throw new AssertionError();
+ }
+
+ /**
+ * Visits all children of a parent.
+ */
+ @Override protected IgniteRel processNode(IgniteRel rel) {
+ if (rel == cutPoint) {
+ cutPoint = null;
+
+ return split(rel);
+ }
+
+ List<IgniteRel> inputs = Commons.cast(rel.getInputs());
+
+ for (int i = 0; i < inputs.size(); i++)
+ visitChild(rel, i, inputs.get(i));
+
+ return rel;
+ }
+
+ /** */
+ private IgniteRel split(IgniteRel rel) {
+ RelOptCluster cluster = rel.getCluster();
+ RelTraitSet traits = rel.getTraitSet();
+ RelDataType rowType = rel.getRowType();
+
+ RelNode input = rel instanceof IgniteTrimExchange ? rel.getInput(0) : rel;
+
+ long targetFragmentId = curr.id;
+ long sourceFragmentId = IdGenerator.nextId();
+ long exchangeId = sourceFragmentId;
+
+ IgniteReceiver receiver = new IgniteReceiver(cluster, traits, rowType, exchangeId, sourceFragmentId);
+ IgniteSender sender = new IgniteSender(cluster, traits, input, exchangeId, targetFragmentId, rel.distribution());
+
+ curr.remotes.add(receiver);
+ stack.push(new FragmentProto(sourceFragmentId, sender));
+
+ return receiver;
+ }
+
+ /** */
+ private static class FragmentProto {
+ /** */
+ private final long id;
+
+ /** */
+ private IgniteRel root;
+
+ /** */
+ private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
+
+ /** */
+ private FragmentProto(long id, IgniteRel root) {
+ this.id = id;
+ this.root = root;
+ }
+
+ /** */
+ Fragment build() {
+ return new Fragment(id, root, remotes.build());
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
similarity index 68%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
index 01746a7..bc57ff8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IdGenerator.java
@@ -14,22 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/** */
+public class IdGenerator {
/** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
+ private static final AtomicLong ID_GEN = new AtomicLong();
/** */
- public static boolean boolFoo(Object... args) {
- return args == null;
- }
+ private IdGenerator() {}
/** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
+ public static long nextId() {
+ return ID_GEN.getAndIncrement();
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
new file mode 100644
index 0000000..6337018
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+
+/**
+ * Regular query or DML
+ */
+public interface MultiStepPlan extends QueryPlan {
+ /**
+ * @return Query fragments.
+ */
+ List<Fragment> fragments();
+
+ /**
+ * @param fragment Fragment.
+ * @return Mapping for a given fragment.
+ */
+ FragmentMapping mapping(Fragment fragment);
+
+ /** */
+ ColocationGroup target(Fragment fragment);
+
+ /** */
+ Map<Long, List<UUID>> remotes(Fragment fragment);
+
+ /**
+ * Inits query fragments.
+ *
+ * @param ctx Planner context.
+ */
+ void init(PlanningContext ctx);
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
similarity index 59%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
index 01746a7..9ef2cfb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+/**
+ * Distributed query plan.
+ */
+public class MultiStepQueryPlan extends AbstractMultiStepPlan {
+ /**
+ * @param fieldsMeta Fields metadata.
+ */
+ public MultiStepQueryPlan(QueryTemplate queryTemplate, Object fieldsMeta) {
+ super(queryTemplate, fieldsMeta);
}
- /** */
- public static boolean boolFoo(Object... args) {
- return args == null;
+ /** {@inheritDoc} */
+ @Override public Type type() {
+ return Type.QUERY;
}
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
+ /** {@inheritDoc} */
+ @Override public QueryPlan copy() {
+ return new MultiStepQueryPlan(queryTemplate, fieldsMetadata);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index ec597bb..8a895a4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -68,6 +68,9 @@ public final class PlanningContext implements Context {
private final Object[] parameters;
/** */
+ private final long topVer;
+
+ /** */
private final IgniteTypeFactory typeFactory;
/** */
@@ -91,11 +94,14 @@ public final class PlanningContext implements Context {
UUID locNodeId,
UUID originatingNodeId,
String qry,
- Object[] parameters) {
+ Object[] parameters,
+ long topVer
+ ) {
this.locNodeId = locNodeId;
this.originatingNodeId = originatingNodeId;
this.qry = qry;
this.parameters = parameters;
+ this.topVer = topVer;
this.parentCtx = Contexts.chain(parentCtx, cfg.getContext());
// link frameworkConfig#context() to this.
@@ -141,6 +147,13 @@ public final class PlanningContext implements Context {
return parameters;
}
+ /**
+ * @return Topology version.
+ */
+ public long topologyVersion() {
+ return topVer;
+ }
+
// Helper methods
/**
* @return Sql operators table.
@@ -276,7 +289,7 @@ public final class PlanningContext implements Context {
/**
* Planner context builder.
*/
- @SuppressWarnings("PublicInnerClass")
+ @SuppressWarnings("PublicInnerClass")
public static class Builder {
/** */
private static final FrameworkConfig EMPTY_CONFIG =
@@ -303,6 +316,9 @@ public final class PlanningContext implements Context {
/** */
private Object[] parameters;
+ /** */
+ private long topVer;
+
/**
* @param locNodeId Local node ID.
* @return Builder for chaining.
@@ -359,12 +375,21 @@ public final class PlanningContext implements Context {
}
/**
+ * @param topVer Topology version.
+ * @return Builder for chaining.
+ */
+ public Builder topologyVersion(long topVer) {
+ this.topVer = topVer;
+ return this;
+ }
+
+ /**
* Builds planner context.
*
* @return Planner context.
*/
public PlanningContext build() {
- return new PlanningContext(frameworkCfg, parentCtx, locNodeId, originatingNodeId, qry, parameters);
+ return new PlanningContext(frameworkCfg, parentCtx, locNodeId, originatingNodeId, qry, parameters, topVer);
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
index 01746a7..6f9161b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java
@@ -14,22 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
+package org.apache.ignite.internal.processors.query.calcite.prepare;
- /** */
- public static boolean boolFoo(Object... args) {
- return args == null;
- }
+/**
+ *
+ */
+public interface QueryPlan {
+ /** Query type */
+ enum Type { QUERY, FRAGMENT, DML, DDL, EXPLAIN }
+
+ /**
+ * @return Query type.
+ */
+ Type type();
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
- }
+ /**
+ * Clones this plan.
+ */
+ QueryPlan copy();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
new file mode 100644
index 0000000..5e2a7c2
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
+/** */
+public class QueryTemplate {
+ /** */
+ private final MappingService mappingService;
+
+ /** */
+ private final ImmutableList<Fragment> fragments;
+
+ /** */
+ private final AtomicReference<ExecutionPlan> executionPlan = new AtomicReference<>();
+
+ /** */
+ public QueryTemplate(MappingService mappingService, List<Fragment> fragments) {
+ this.mappingService = mappingService;
+
+ ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+ for (Fragment fragment : fragments)
+ b.add(fragment.detach());
+
+ this.fragments = b.build();
+ }
+
+ /** */
+ public ExecutionPlan map(PlanningContext ctx) {
+ ExecutionPlan executionPlan = this.executionPlan.get();
+ if (executionPlan != null && Objects.equals(executionPlan.topologyVersion(), ctx.topologyVersion()))
+ return executionPlan;
+
+ List<Fragment> fragments = Commons.transform(this.fragments, f -> f.attach(ctx));
+
+ Exception ex = null;
+ RelMetadataQuery mq = first(fragments).root().getCluster().getMetadataQuery();
+ for (int i = 0; i < 3; i++) {
+ try {
+ ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(fragments, ctx, mq));
+
+ if (executionPlan == null || executionPlan.topologyVersion() < executionPlan0.topologyVersion())
+ this.executionPlan.compareAndSet(executionPlan, executionPlan0);
+
+ return executionPlan0;
+ }
+ catch (FragmentMappingException e) {
+ if (ex == null)
+ ex = e;
+ else
+ ex.addSuppressed(e);
+
+ fragments = replace(fragments, e.fragment(), new FragmentSplitter(e.node()).go(e.fragment()));
+ }
+ }
+
+ throw new IgniteException("Failed to map query.", ex);
+ }
+
+ /** */
+ @NotNull private List<Fragment> map(List<Fragment> fragments, PlanningContext ctx, RelMetadataQuery mq) {
+ ImmutableList.Builder<Fragment> b = ImmutableList.builder();
+ for (Fragment fragment : fragments)
+ b.add(fragment.map(mappingService, ctx, mq).detach());
+
+ return b.build();
+ }
+
+ /** */
+ private List<Fragment> replace(List<Fragment> fragments, Fragment fragment, List<Fragment> replacement) {
+ assert !nullOrEmpty(replacement);
+
+ Map<Long, Long> newTargets = new HashMap<>();
+ for (Fragment fragment0 : replacement) {
+ for (IgniteReceiver remote : fragment0.remotes())
+ newTargets.put(remote.exchangeId(), fragment0.fragmentId());
+ }
+
+ List<Fragment> fragments0 = new ArrayList<>(fragments.size() + replacement.size() - 1);
+ for (Fragment fragment0 : fragments) {
+ if (fragment0 == fragment)
+ fragment0 = first(replacement);
+ else if (!fragment0.rootFragment()) {
+ IgniteSender sender = (IgniteSender)fragment0.root();
+ Long newTargetId = newTargets.get(sender.exchangeId());
+
+ if (newTargetId != null) {
+ sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
+ sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());
+
+ fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes());
+ }
+ }
+
+ fragments0.add(fragment0);
+ }
+
+ fragments0.addAll(replacement.subList(1, replacement.size()));
+
+ return fragments0;
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
new file mode 100644
index 0000000..ec0dd76
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.SourceAwareIgniteRel;
+
+/**
+ * Splits a query into a list of query fragments.
+ */
+public class Splitter extends IgniteRelShuttle {
+ /** */
+ private final Deque<FragmentProto> stack = new LinkedList<>();
+
+ /** */
+ private FragmentProto curr;
+
+ /** */
+ public List<Fragment> go(IgniteRel root) {
+ ArrayList<Fragment> res = new ArrayList<>();
+
+ stack.push(new FragmentProto(IdGenerator.nextId(), root));
+
+ while (!stack.isEmpty()) {
+ curr = stack.pop();
+
+ curr.root = visit(curr.root);
+
+ res.add(curr.build());
+
+ curr = null;
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteReceiver rel) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteExchange rel) {
+ RelOptCluster cluster = rel.getCluster();
+
+ long targetFragmentId = curr.id;
+ long sourceFragmentId = IdGenerator.nextId();
+ long exchangeId = sourceFragmentId;
+
+ IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId);
+ IgniteSender sender = new IgniteSender(cluster, rel.getTraitSet(), rel.getInput(), exchangeId, targetFragmentId,
+ rel.distribution());
+
+ curr.remotes.add(receiver);
+ stack.push(new FragmentProto(sourceFragmentId, sender));
+
+ return receiver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTrimExchange rel) {
+ return ((SourceAwareIgniteRel)processNode(rel)).clone(IdGenerator.nextId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexScan rel) {
+ return rel.clone(IdGenerator.nextId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ return rel.clone(IdGenerator.nextId());
+ }
+
+ /** */
+ private static class FragmentProto {
+ /** */
+ private final long id;
+
+ /** */
+ private IgniteRel root;
+
+ /** */
+ private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder();
+
+ /** */
+ private FragmentProto(long id, IgniteRel root) {
+ this.id = id;
+ this.root = root;
+ }
+
+ /** */
+ Fragment build() {
+ return new Fragment(id, root, remotes.build());
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java
new file mode 100644
index 0000000..6c283c6
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteSpool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Spool;
+
+/**
+ * Relational operator that returns the contents of a table.
+ */
+public abstract class AbstractIgniteSpool extends Spool implements IgniteRel {
+ /** */
+ public AbstractIgniteSpool(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ Type readType,
+ RelNode input
+ ) {
+ super(cluster, traits, input, readType, Type.EAGER);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return pw
+ .input("input", getInput())
+ .item("readType", readType.name())
+ .item("writeType", writeType.name());
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
index 77cd1fd..5e96a71 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashIndexSpool.java
@@ -40,7 +40,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
* Relational operator that returns the hashed contents of a table
* and allow to lookup rows by specified keys.
*/
-public class IgniteHashIndexSpool extends Spool implements IgniteRel {
+public class IgniteHashIndexSpool extends AbstractIgniteSpool implements IgniteRel {
/** Search row. */
private final List<RexNode> searchRow;
@@ -58,7 +58,7 @@ public class IgniteHashIndexSpool extends Spool implements IgniteRel {
List<RexNode> searchRow,
RexNode cond
) {
- super(cluster, traits, input, Type.LAZY, Type.EAGER);
+ super(cluster, traits, Type.LAZY, input);
assert !nullOrEmpty(searchRow);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index 851d99f..c5432b6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -41,6 +41,7 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
@@ -86,8 +87,8 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
input.getExpression("condition"),
ImmutableSet.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)),
input.getEnum("joinType", JoinRelType.class),
- null,//((RelInputEx)input).getCollation("leftCollation"),
- null//((RelInputEx)input).getCollation("rightCollation")
+ ((RelInputEx)input).getCollation("leftCollation"),
+ ((RelInputEx)input).getCollation("rightCollation")
);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
index 8dfea74..1d16d58 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSortedIndexSpool.java
@@ -39,7 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
* Relational operator that returns the sorted contents of a table
* and allow to lookup rows by specified bounds.
*/
-public class IgniteSortedIndexSpool extends Spool implements IgniteRel {
+public class IgniteSortedIndexSpool extends AbstractIgniteSpool implements IgniteRel {
/** */
private final RelCollation collation;
@@ -58,7 +58,7 @@ public class IgniteSortedIndexSpool extends Spool implements IgniteRel {
RexNode condition,
IndexConditions idxCond
) {
- super(cluster, traits, input, Type.LAZY, Type.EAGER);
+ super(cluster, traits, Type.LAZY, input);
assert Objects.nonNull(idxCond);
assert Objects.nonNull(condition);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
index aa07c78..031fdeb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableFunctionScan.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.metadata.RelColumnMapping;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
/**
@@ -52,6 +54,15 @@ public class IgniteTableFunctionScan extends TableFunctionScan implements Ignite
super(cluster, traits, ImmutableList.of(), call, null, rowType, null);
}
+ /**
+ * Constructor used for deserialization.
+ *
+ * @param input Serialized representation.
+ */
+ public IgniteTableFunctionScan(RelInput input) {
+ super(changeTraits(input, IgniteConvention.INSTANCE));
+ }
+
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteTableFunctionScan(cluster, getTraitSet(), getCall(), getRowType());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 166c432..db5a054 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -73,7 +73,7 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
input.getInput(),
input.getEnum("operation", Operation.class),
input.getStringList("updateColumnList"),
- input.getExpressionList("sourceExpressionList"),
+ input.get("sourceExpressionList") != null ? input.getExpressionList("sourceExpressionList") : null,
input.getBoolean("flattened", true)
);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index 9c5a266..cdd7907 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
+
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -32,7 +33,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
/**
* Relational operator that returns the contents of a table.
*/
-public class IgniteTableSpool extends Spool implements IgniteRel {
+public class IgniteTableSpool extends AbstractIgniteSpool implements IgniteRel {
/** */
public IgniteTableSpool(
RelOptCluster cluster,
@@ -40,7 +41,7 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
Spool.Type readType,
RelNode input
) {
- super(cluster, traits, input, readType, Type.EAGER);
+ super(cluster, traits, readType, input);
}
/**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 56c1a5a..9554cb4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -25,6 +25,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -76,6 +78,14 @@ public interface IgniteTable extends TranslatableTable {
IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName);
/**
+ * Returns nodes mapping.
+ *
+ * @param ctx Planning context.
+ * @return Nodes mapping.
+ */
+ ColocationGroup colocationGroup(PlanningContext ctx);
+
+ /**
* @return Table distribution.
*/
IgniteDistribution distribution();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
new file mode 100644
index 0000000..8786074
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.lang.reflect.Method;
+
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMappingMetadata;
+
+/**
+ * Contains methods used in metadata definitions.
+ */
+public enum IgniteMethod {
+ /** See {@link FragmentMappingMetadata#fragmentMapping()} */
+ FRAGMENT_MAPPING(FragmentMappingMetadata.class, "fragmentMapping");
+
+ /** */
+ private final Method method;
+
+ /**
+ * @param clazz Class where to lookup method.
+ * @param methodName Method name.
+ * @param argumentTypes Method parameters types.
+ */
+ IgniteMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
+ method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+
+ /**
+ * @return Method.
+ */
+ public Method method() {
+ return method;
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 01403c5..778df9b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -37,6 +38,7 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelDistribution;
@@ -59,9 +61,14 @@ import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
@@ -79,14 +86,18 @@ import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTr
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import static org.apache.calcite.tools.Frameworks.createRootSchema;
import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/** */
@@ -207,6 +218,8 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
try {
IgniteRel rel = PlannerHelper.optimize(sqlNode, planner);
+ checkSplitAndSerialization(rel, publicSchema);
+
// System.out.println(RelOptUtil.toString(rel));
return rel;
@@ -398,6 +411,84 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
}
/** */
+ protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+ assertNotNull(rel);
+
+ rel = Cloner.clone(rel);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ List<Fragment> fragments = new Splitter().go(rel);
+ List<String> serialized = new ArrayList<>(fragments.size());
+
+ for (Fragment fragment : fragments)
+ serialized.add(toJson(fragment.root()));
+
+ assertNotNull(serialized);
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ List<UUID> nodes = new ArrayList<>(4);
+
+ for (int i = 0; i < 4; i++)
+ nodes.add(UUID.randomUUID());
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(nodes))
+ .originatingNodeId(first(nodes))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .build();
+
+ List<RelNode> deserializedNodes = new ArrayList<>();
+
+ try (IgnitePlanner ignored = ctx.planner()) {
+ for (String s : serialized) {
+ RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+ deserializedNodes.add(reader.read(s));
+ }
+ }
+
+ List<RelNode> expectedRels = fragments.stream()
+ .map(Fragment::root)
+ .collect(Collectors.toList());
+
+ assertEquals(expectedRels.size(), deserializedNodes.size(), "Invalid deserialization fragments count");
+
+ for (int i = 0; i < expectedRels.size(); ++i) {
+ RelNode expected = expectedRels.get(i);
+ RelNode deserialized = deserializedNodes.get(i);
+
+ clearTraits(expected);
+ clearTraits(deserialized);
+
+ if (!expected.deepEquals(deserialized))
+ assertTrue(
+ expected.deepEquals(deserialized),
+ "Invalid serialization / deserialization.\n" +
+ "Expected:\n" + RelOptUtil.toString(expected) +
+ "Deserialized:\n" + RelOptUtil.toString(deserialized)
+ );
+ }
+ }
+
+ /** */
+ protected void clearTraits(RelNode rel) {
+ IgniteTestUtils.setFieldValue(rel, AbstractRelNode.class, "traitSet", RelTraitSet.createEmpty());
+ rel.getInputs().forEach(this::clearTraits);
+ }
+
+ /** */
abstract static class TestTable implements IgniteTable {
/** */
private final String name;
@@ -531,6 +622,11 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
}
/** {@inheritDoc} */
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteDistribution distribution() {
throw new AssertionError();
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
new file mode 100644
index 0000000..4fe7ef4
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -0,0 +1,1847 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ *
+ */
+public class PlannerTest extends AbstractPlannerTest {
+ /** */
+ private static List<UUID> NODES;
+
+ @BeforeAll
+ public static void init() {
+ NODES = new ArrayList<>(4);
+
+ for (int i = 0; i < 4; i++)
+ NODES.add(UUID.randomUUID());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLogicalPlan() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 + 1" +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLogicalPlanDefaultSchema() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCorrelatedQuery() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = d.id) name, d.projectId " +
+ "FROM Developer d";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHepPlaner() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteIndex getIndex(String idxName) {
+ return new IgniteIndex(null, null, null);
+ }
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 0, 1),
+ select(NODES, 1, 2),
+ select(NODES, 2, 0),
+ select(NODES, 0, 1),
+ select(NODES, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Developer", "hash");
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteIndex getIndex(String idxName) {
+ return new IgniteIndex(null, null, null);
+ }
+
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 0, 1),
+ select(NODES, 1, 2),
+ select(NODES, 2, 0),
+ select(NODES, 0, 1),
+ select(NODES, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testVolcanoPlanerDistributed() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Developer", "hash");
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d " +
+ "JOIN (" +
+ " SELECT pp.id as id0, pp.ver as ver0 " +
+ " FROM PUBLIC.Project pp" +
+ ") p ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ // Transformation chain
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot.rel);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterColocatedPartitionedPartitioned() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 0, 1),
+ select(NODES, 1, 2),
+ select(NODES, 2, 0),
+ select(NODES, 0, 1),
+ select(NODES, 1, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Developer", "hash");
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 0, 1),
+ select(NODES, 1, 2),
+ select(NODES, 2, 0),
+ select(NODES, 0, 1),
+ select(NODES, 1, 2)));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE,
+ DistributionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ assertNotNull(ctx);
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertEquals(2, plan.fragments().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterColocatedReplicatedReplicated() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 0, 1, 2, 3));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertEquals(1, plan.fragments().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterPartiallyColocatedReplicatedAndPartitioned() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 0));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 1, 2),
+ select(NODES, 2, 3),
+ select(NODES, 3, 0),
+ select(NODES, 0, 1)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.id = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertEquals(3, plan.fragments().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterPartiallyColocated1() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 1, 2, 3));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 0),
+ select(NODES, 1),
+ select(NODES, 2)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertEquals(3, plan.fragments().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterPartiallyColocated2() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 0));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forAssignments(Arrays.asList(
+ select(NODES, 1),
+ select(NODES, 2),
+ select(NODES, 3)
+ ));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertEquals(3, plan.fragments().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSplitterNonColocated() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 2));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ return ColocationGroup.forNodes(select(NODES, 0, 1));
+ }
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.ver0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ planner.setDisabledRules(ImmutableSet.of("CorrelatedNestedLoopJoin"));
+
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(planner);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ relRoot = relRoot.withRel(rel).withKind(sqlNode.getKind());
+ }
+
+ assertNotNull(relRoot);
+
+ MultiStepPlan plan = new MultiStepQueryPlan(new QueryTemplate(this::intermediateMapping,
+ new Splitter().go((IgniteRel) relRoot.rel)), null);
+
+ assertNotNull(plan);
+
+ plan.init(ctx);
+
+ assertNotNull(plan);
+
+ assertEquals(2, plan.fragments().size());
+ }
+
+ /** */
+ @Test
+ public void testSerializationDeserialization() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable developer = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("PROJECTID", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Developer", "hash");
+ }
+ };
+
+ TestTable project = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("VER", f.createJavaType(Integer.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "Project", "hash");
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("DEVELOPER", developer);
+ publicSchema.addTable("PROJECT", project);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+ "FROM PUBLIC.Developer d JOIN (" +
+ "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+ ") p " +
+ "ON d.projectId = p.id0 " +
+ "WHERE (d.projectId + 1) > ?";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelNode rel;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ rel = planner.rel(sqlNode).rel;
+
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ // Transformation chain
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .simplify();
+
+ rel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+ }
+
+ assertNotNull(rel);
+
+ List<Fragment> fragments = new Splitter().go((IgniteRel)rel);
+ List<String> serialized = new ArrayList<>(fragments.size());
+
+ for (Fragment fragment : fragments)
+ serialized.add(toJson(fragment.root()));
+
+ assertNotNull(serialized);
+
+ ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ List<RelNode> nodes = new ArrayList<>();
+
+ try (IgnitePlanner ignored = ctx.planner()) {
+ for (String s : serialized) {
+ RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+ nodes.add(reader.read(s));
+ }
+ }
+
+ assertNotNull(nodes);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMergeFilters() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable testTbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.single();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", testTbl);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "" +
+ "SELECT val from (\n" +
+ " SELECT * \n" +
+ " FROM TEST \n" +
+ " WHERE VAL = 10) \n" +
+ "WHERE VAL = 10";
+
+ RelTraitDef<?>[] traitDefs = {
+ ConventionTraitDef.INSTANCE,
+ DistributionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .build())
+ .query(sql)
+ .parameters(2)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster()
+ .traitSetOf(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single());
+
+ RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ assertNotNull(phys);
+
+ AtomicInteger filterCnt = new AtomicInteger();
+
+ // Counts filters af the plan.
+ phys.childrenAccept(
+ new RelVisitor() {
+ @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+ if (node instanceof IgniteFilter)
+ filterCnt.incrementAndGet();
+
+ super.visit(node, ordinal, parent);
+ }
+ }
+ );
+
+ // Checks that two filter merged into one filter.
+ // Expected plan:
+ // IgniteProject(VAL=[$1])
+ // IgniteProject(ID=[$0], VAL=[$1])
+ // IgniteFilter(condition=[=(CAST($1):INTEGER, 10)])
+ // IgniteTableScan(table=[[PUBLIC, TEST]])
+ assertEquals(0, filterCnt.get());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinPushExpressionRule() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable emp = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ TestTable dept = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("EMP", emp);
+ publicSchema.addTable("DEPT", dept);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "select d.deptno, e.deptno " +
+ "from dept d, emp e " +
+ "where d.deptno + e.deptno = 2";
+
+ RelTraitDef<?>[] traitDefs = {
+ DistributionTraitDef.INSTANCE,
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE,
+ RewindabilityTraitDef.INSTANCE,
+ CorrelationTraitDef.INSTANCE
+ };
+
+ PlanningContext ctx = PlanningContext.builder()
+ .localNodeId(first(NODES))
+ .originatingNodeId(first(NODES))
+ .parentContext(Contexts.empty())
+ .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .traitDefs(traitDefs)
+ .costFactory(new IgniteCostFactory(1, 100, 1, 1))
+ .build())
+ .query(sql)
+ .build();
+
+ RelRoot relRoot;
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ String qry = ctx.query();
+
+ assertNotNull(qry);
+
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
+
+ // Validate
+ sqlNode = planner.validate(sqlNode);
+
+ // Convert to Relational operators graph
+ relRoot = planner.rel(sqlNode);
+
+ RelNode rel = relRoot.rel;
+
+ assertNotNull(rel);
+ assertEquals("" +
+ "LogicalFilter(condition=[=(CAST(+($0, $1)):INTEGER, 2)])\n" +
+ " LogicalJoin(condition=[true], joinType=[inner])\n" +
+ " LogicalProject(DEPTNO=[$0])\n" +
+ " IgniteLogicalTableScan(table=[[PUBLIC, DEPT]])\n" +
+ " LogicalProject(DEPTNO=[$2])\n" +
+ " IgniteLogicalTableScan(table=[[PUBLIC, EMP]])\n",
+ RelOptUtil.toString(rel));
+
+ // Transformation chain
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .replace(CorrelationTrait.UNCORRELATED)
+ .simplify();
+
+ IgniteRel phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ assertNotNull(phys);
+ assertEquals(
+ "IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0, $1)):INTEGER, 2)], joinType=[inner], " +
+ "correlationVariables=[[$cor1]])\n" +
+ " IgniteTableScan(table=[[PUBLIC, DEPT]], requiredColumns=[{0}])\n" +
+ " IgniteTableScan(table=[[PUBLIC, EMP]], filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColumns=[{2}])\n",
+ RelOptUtil.toString(phys),
+ "Invalid plan:\n" + RelOptUtil.toString(phys)
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+ }
+
+ /** */
+ @Test
+ public void testMergeJoin() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable emp = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ emp.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "emp_idx", emp));
+
+ TestTable dept = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ dept.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "dep_idx", dept));
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("EMP", emp);
+ publicSchema.addTable("DEPT", dept);
+
+ SchemaPlus schema = createRootSchema(false)
+ .add("PUBLIC", publicSchema);
+
+ String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name = d.name order by e.name, d.deptno";
+
+ IgniteRel phys = physicalPlan(sql, publicSchema);
+
+ assertNotNull(phys);
+ assertEquals("" +
+ "IgniteMergeJoin(condition=[AND(=($0, $4), =($3, $1))], joinType=[inner], leftCollation=[[0, 1]], " +
+ "rightCollation=[[2, 1]])\n" +
+ " IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" +
+ " IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n",
+ RelOptUtil.toString(phys));
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+
+ /** */
+ @Test
+ public void testMergeJoinIsNotAppliedForNonEquiJoin() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable emp = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .build(), RewindabilityTrait.REWINDABLE, 1000) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ emp.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 2)), "emp_idx", emp));
+
+ TestTable dept = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("DEPTNO", f.createJavaType(Integer.class))
+ .add("NAME", f.createJavaType(String.class))
+ .build(), RewindabilityTrait.REWINDABLE, 100) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ dept.addIndex(new IgniteIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "dep_idx", dept));
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("EMP", emp);
+ publicSchema.addTable("DEPT", dept);
+
+ String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name >= d.name order by e.name, d.deptno";
+
+ RelNode phys = physicalPlan(sql, publicSchema, "CorrelatedNestedLoopJoin");
+
+ assertNotNull(phys);
+ assertEquals("" +
+ "IgniteProject(DEPTNO=[$3], NAME=[$4], ID=[$0], NAME0=[$1], DEPTNO0=[$2])\n" +
+ " IgniteSort(sort0=[$1], sort1=[$3], dir0=[ASC], dir1=[ASC])\n" +
+ " IgniteNestedLoopJoin(condition=[AND(=($3, $2), >=($1, $4))], joinType=[inner])\n" +
+ " IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n" +
+ " IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n",
+ RelOptUtil.toString(phys));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testLimit() throws Exception {
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ TestTable testTbl = new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.broadcast();
+ }
+ };
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", testTbl);
+
+ String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
+
+ {
+ IgniteRel phys = physicalPlan(sql, publicSchema);
+
+ assertNotNull(phys);
+
+ AtomicInteger limit = new AtomicInteger();
+ AtomicBoolean sort = new AtomicBoolean();
+
+ relTreeVisit(phys, (node, ordinal, parent) -> {
+ if (node instanceof IgniteLimit)
+ limit.incrementAndGet();
+
+ if (node instanceof IgniteSort)
+ sort.set(true);
+ }
+ );
+
+ String errMsg = "Invalid plan: \n" + RelOptUtil.toString(phys);
+
+ assertEquals(1, limit.get(), errMsg);
+ assertFalse(sort.get(), errMsg);
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+
+ sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
+
+ {
+ IgniteRel phys = physicalPlan(sql, publicSchema);
+
+ assertNotNull(phys);
+
+ AtomicInteger limit = new AtomicInteger();
+ AtomicBoolean sort = new AtomicBoolean();
+
+ relTreeVisit(phys, (node, ordinal, parent) -> {
+ if (node instanceof IgniteLimit)
+ limit.incrementAndGet();
+
+ if (node instanceof IgniteSort)
+ sort.set(true);
+ }
+ );
+
+ String errMsg = "Invalid plan: \n" + RelOptUtil.toString(phys);
+
+ assertEquals(1, limit.get(), errMsg);
+ assertTrue(sort.get(), errMsg);
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+ }
+
+ /** */
+ @Test
+ public void testNotStandardFunctions() throws Exception {
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+ IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ publicSchema.addTable(
+ "TEST",
+ new TestTable(
+ new RelDataTypeFactory.Builder(f)
+ .add("ID", f.createJavaType(Integer.class))
+ .add("VAL", f.createJavaType(String.class))
+ .build()) {
+
+ @Override public IgniteDistribution distribution() {
+ return IgniteDistributions.affinity(0, "TEST", "hash");
+ }
+ }
+ );
+
+ String[] queries = {
+ "select REVERSE(val) from TEST", // MYSQL
+ "select TO_DATE(val, 'yyyymmdd') from TEST" // ORACLE
+ };
+
+ for (String sql : queries) {
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema
+ );
+
+ checkSplitAndSerialization(phys, publicSchema);
+ }
+ }
+
+ /** */
+ private List<UUID> intermediateMapping(long topVer, boolean single,
+ @Nullable Predicate<ClusterNode> filter) {
+ return single ? select(NODES, 0) : select(NODES, 0, 1, 2, 3);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index e37f524..7b1fdf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -194,6 +194,62 @@ public final class ArrayUtils {
}
/**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(byte[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(short[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(int[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(long[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(float[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(double[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
+ * @param arr Array to check.
+ * @return {@code true} if {@code null} or an empty array is provided, {@code false} otherwise.
+ */
+ public static boolean nullOrEmpty(boolean[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
+ /**
* Converts array to {@link List}. Note that resulting list cannot
* be altered in size, as it it based on the passed in array -
* only current elements can be changed.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
similarity index 65%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
index 01746a7..5637411 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Stubs.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntIterator.java
@@ -14,22 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
-/** Stubs */
-public class Stubs {
- /** */
- public static int intFoo(Object... args) {
- return args == null ? 0 : args.length;
- }
+package org.apache.ignite.internal.util;
- /** */
- public static boolean boolFoo(Object... args) {
- return args == null;
- }
+/**
+ * Iterator over integer primitives.
+ */
+public interface IgniteIntIterator {
+ /**
+ * @return {@code true} if the iteration has more elements.
+ */
+ public boolean hasNext();
- /** */
- public static String stringFoo(Object... args) {
- return args == null ? "null" : "not null";
- }
+ /**
+ * @return Next int.
+ */
+ public int next();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java
new file mode 100644
index 0000000..bf5882a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteIntList.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+
+/**
+ * Minimal list API to work with primitive ints. This list exists
+ * to avoid boxing/unboxing when using standard list from Java.
+ */
+public class IgniteIntList implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int[] arr;
+
+ /** */
+ private int idx;
+
+ /**
+ *
+ */
+ public IgniteIntList() {
+ // No-op.
+ }
+
+ /**
+ * @param size Size.
+ */
+ public IgniteIntList(int size) {
+ arr = new int[size];
+ // idx = 0
+ }
+
+ /**
+ * @param arr Array.
+ */
+ public IgniteIntList(int[] arr) {
+ this.arr = arr;
+
+ idx = arr.length;
+ }
+
+ /**
+ * @param vals Values.
+ * @return List from values.
+ */
+ public static IgniteIntList asList(int... vals) {
+ if (nullOrEmpty(vals))
+ return new IgniteIntList();
+
+ return new IgniteIntList(vals);
+ }
+
+ /**
+ * @param arr Array.
+ * @param size Size.
+ */
+ private IgniteIntList(int[] arr, int size) {
+ this.arr = arr;
+ idx = size;
+ }
+
+ /**
+ * @return Copy of this list.
+ */
+ public IgniteIntList copy() {
+ if (idx == 0)
+ return new IgniteIntList();
+
+ return new IgniteIntList(Arrays.copyOf(arr, idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof IgniteIntList))
+ return false;
+
+ IgniteIntList that = (IgniteIntList)o;
+
+ if (idx != that.idx)
+ return false;
+
+ if (idx == 0 || arr == that.arr)
+ return true;
+
+ for (int i = 0; i < idx; i++) {
+ if (arr[i] != that.arr[i])
+ return false;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 1;
+
+ for (int i = 0; i < idx; i++) {
+ int element = arr[i];
+ res = 31 * res + element;
+ }
+
+ return res;
+ }
+
+ /**
+ * @param l List to add all elements of.
+ */
+ public void addAll(IgniteIntList l) {
+ assert l != null;
+
+ if (l.isEmpty())
+ return;
+
+ if (arr == null)
+ arr = new int[4];
+
+ int len = arr.length;
+
+ while (len < idx + l.size())
+ len <<= 1;
+
+ arr = Arrays.copyOf(arr, len);
+
+ System.arraycopy(l.arr, 0, arr, idx, l.size());
+
+ idx += l.size();
+ }
+
+ /**
+ * Add element to this array.
+ * @param x Value.
+ */
+ public void add(int x) {
+ if (arr == null)
+ arr = new int[4];
+ else if (arr.length == idx)
+ arr = Arrays.copyOf(arr, arr.length << 1);
+
+ arr[idx++] = x;
+ }
+
+ /**
+ * Clears the list.
+ */
+ public void clear() {
+ idx = 0;
+ }
+
+ /**
+ * Gets the last element.
+ *
+ * @return The last element.
+ */
+ public int last() {
+ return arr[idx - 1];
+ }
+
+ /**
+ * Removes and returns the last element of the list. Complementary method to {@link #add(int)} for stack like usage.
+ *
+ * @return Removed element.
+ * @throws NoSuchElementException If the list is empty.
+ */
+ public int remove() throws NoSuchElementException {
+ if (idx == 0)
+ throw new NoSuchElementException();
+
+ return arr[--idx];
+ }
+
+ /**
+ * Returns (possibly reordered) copy of this list, excluding all elements of given list.
+ *
+ * @param l List of elements to remove.
+ * @return New list without all elements from {@code l}.
+ */
+ public IgniteIntList copyWithout(IgniteIntList l) {
+ assert l != null;
+
+ if (idx == 0)
+ return new IgniteIntList();
+
+ if (l.idx == 0)
+ return new IgniteIntList(Arrays.copyOf(arr, idx));
+
+ int[] newArr = Arrays.copyOf(arr, idx);
+ int newIdx = idx;
+
+ for (int i = 0; i < l.size(); i++) {
+ int rmVal = l.get(i);
+
+ for (int j = 0; j < newIdx; j++) {
+ if (newArr[j] == rmVal) {
+
+ while (newIdx > 0 && newArr[newIdx - 1] == rmVal)
+ newIdx--;
+
+ if (newIdx > 0) {
+ newArr[j] = newArr[newIdx - 1];
+ newIdx--;
+ }
+ }
+ }
+ }
+
+ return new IgniteIntList(newArr, newIdx);
+ }
+
+ /**
+ * @param i Index.
+ * @return Value.
+ */
+ public int get(int i) {
+ assert i < idx;
+
+ return arr[i];
+ }
+
+ /**
+ * @return Size.
+ */
+ public int size() {
+ return idx;
+ }
+
+ /**
+ * @return {@code True} if this list has no elements.
+ */
+ public boolean isEmpty() {
+ return idx == 0;
+ }
+
+ /**
+ * @param l Element to find.
+ * @return {@code True} if found.
+ */
+ public boolean contains(int l) {
+ for (int i = 0; i < idx; i++) {
+ if (arr[i] == l)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param l List to check.
+ * @return {@code True} if this list contains all the elements of passed in list.
+ */
+ public boolean containsAll(IgniteIntList l) {
+ for (int i = 0; i < l.size(); i++) {
+ if (!contains(l.get(i)))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return {@code True} if there are no duplicates.
+ */
+ public boolean distinct() {
+ for (int i = 0; i < idx; i++) {
+ for (int j = i + 1; j < idx; j++) {
+ if (arr[i] == arr[j])
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param size New size.
+ * @param last If {@code true} the last elements will be removed, otherwise the first.
+ */
+ public void truncate(int size, boolean last) {
+ assert size >= 0 && size <= idx;
+
+ if (size == idx)
+ return;
+
+ if (!last && idx != 0 && size != 0)
+ System.arraycopy(arr, idx - size, arr, 0, size);
+
+ idx = size;
+ }
+
+ /**
+ * Removes element by given index.
+ *
+ * @param i Index.
+ * @return Removed value.
+ */
+ public int removeIndex(int i) {
+ assert i < idx : i;
+
+ int res = arr[i];
+
+ if (i == idx - 1) { // Last element.
+ idx = i;
+ }
+ else {
+ System.arraycopy(arr, i + 1, arr, i, idx - i - 1);
+ idx--;
+ }
+
+ return res;
+ }
+
+ /**
+ * Removes value from this list.
+ *
+ * @param startIdx Index to begin search with.
+ * @param val Value.
+ * @return Index of removed value if the value was found and removed or {@code -1} otherwise.
+ */
+ public int removeValue(int startIdx, int val) {
+ assert startIdx >= 0;
+
+ for (int i = startIdx; i < idx; i++) {
+ if (arr[i] == val) {
+ removeIndex(i);
+
+ return i;
+ }
+ }
+
+ return -1;
+ }
+
+ /**
+ * Removes value from this list.
+ *
+ * @param startIdx Index to begin search with.
+ * @param oldVal Old value.
+ * @param newVal New value.
+ * @return Index of replaced value if the value was found and replaced or {@code -1} otherwise.
+ */
+ public int replaceValue(int startIdx, int oldVal, int newVal) {
+ for (int i = startIdx; i < idx; i++) {
+ if (arr[i] == oldVal) {
+ arr[i] = newVal;
+
+ return i;
+ }
+ }
+
+ return -1;
+ }
+
+ /**
+ * @return Array copy.
+ */
+ public int[] array() {
+ int[] res = new int[idx];
+
+ System.arraycopy(arr, 0, res, 0, idx);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(idx);
+
+ for (int i = 0; i < idx; i++)
+ out.writeInt(arr[i]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ idx = in.readInt();
+
+ arr = new int[idx];
+
+ for (int i = 0; i < idx; i++)
+ arr[i] = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder b = new StringBuilder("[");
+
+ for (int i = 0; i < idx; i++) {
+ if (i != 0)
+ b.append(',');
+
+ b.append(arr[i]);
+ }
+
+ b.append(']');
+
+ return b.toString();
+ }
+
+ /**
+ * @param in Input to read list from.
+ * @return Grid int list.
+ * @throws IOException If failed.
+ */
+ @Nullable public static IgniteIntList readFrom(DataInput in) throws IOException {
+ int idx = in.readInt();
+
+ if (idx == -1)
+ return null;
+
+ int[] arr = new int[idx];
+
+ for (int i = 0; i < idx; i++)
+ arr[i] = in.readInt();
+
+ return new IgniteIntList(arr);
+ }
+
+ /**
+ * @param out Output to write to.
+ * @param list List.
+ * @throws IOException If failed.
+ */
+ public static void writeTo(DataOutput out, @Nullable IgniteIntList list) throws IOException {
+ out.writeInt(list != null ? list.idx : -1);
+
+ if (list != null) {
+ for (int i = 0; i < list.idx; i++)
+ out.writeInt(list.arr[i]);
+ }
+ }
+
+ /**
+ * @param to To list.
+ * @param from From list.
+ * @return To list (passed in or created).
+ */
+ public static IgniteIntList addAll(@Nullable IgniteIntList to, IgniteIntList from) {
+ if (to == null) {
+ IgniteIntList res = new IgniteIntList(from.size());
+
+ res.addAll(from);
+
+ return res;
+ }
+ else {
+ to.addAll(from);
+
+ return to;
+ }
+ }
+
+ /**
+ * Sorts this list.
+ * Use {@code copy().sort()} if you need a defensive copy.
+ *
+ * @return {@code this} For chaining.
+ */
+ public IgniteIntList sort() {
+ if (idx > 1)
+ Arrays.sort(arr, 0, idx);
+
+ return this;
+ }
+
+ /**
+ * Removes given number of elements from the end. If the given number of elements is higher than
+ * list size, then list will be cleared.
+ *
+ * @param cnt Count to pop from the end.
+ */
+ public void pop(int cnt) {
+ assert cnt >= 0 : cnt;
+
+ if (idx < cnt)
+ idx = 0;
+ else
+ idx -= cnt;
+ }
+
+ /**
+ * @return Iterator.
+ */
+ public IgniteIntIterator iterator() {
+ return new IgniteIntIterator() {
+ int c = 0;
+
+ @Override public boolean hasNext() {
+ return c < idx;
+ }
+
+ @Override public int next() {
+ return arr[c++];
+ }
+ };
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b54cf3f..24cc6c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.util;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
import org.jetbrains.annotations.Nullable;
@@ -35,11 +39,28 @@ public class IgniteUtils {
/** Class loader used to load Ignite. */
private static final ClassLoader igniteClassLoader = IgniteUtils.class.getClassLoader();
+ /** Primitive class map. */
+ private static final Map<String, Class<?>> primitiveMap = new HashMap<>(16, .5f);
+
+ /** */
+ private static final ConcurrentMap<ClassLoader, ConcurrentMap<String, Class>> classCache =
+ new ConcurrentHashMap<>();
+
/*
Initializes enterprise check.
*/
static {
IgniteUtils.jdkVer = System.getProperty("java.specification.version");
+
+ primitiveMap.put("byte", byte.class);
+ primitiveMap.put("short", short.class);
+ primitiveMap.put("int", int.class);
+ primitiveMap.put("long", long.class);
+ primitiveMap.put("float", float.class);
+ primitiveMap.put("double", double.class);
+ primitiveMap.put("char", char.class);
+ primitiveMap.put("boolean", boolean.class);
+ primitiveMap.put("void", void.class);
}
/**
@@ -253,4 +274,65 @@ public class IgniteUtils {
public static ClassLoader igniteClassLoader() {
return igniteClassLoader;
}
+
+ /**
+ * Gets class for provided name. Accepts primitive types names.
+ *
+ * @param clsName Class name.
+ * @param ldr Class loader.
+ * @return Class.
+ * @throws ClassNotFoundException If class not found.
+ */
+ public static Class<?> forName(String clsName, @Nullable ClassLoader ldr) throws ClassNotFoundException {
+ return forName(clsName, ldr, null);
+ }
+
+ /**
+ * Gets class for provided name. Accepts primitive types names.
+ *
+ * @param clsName Class name.
+ * @param ldr Class loader.
+ * @return Class.
+ * @throws ClassNotFoundException If class not found.
+ */
+ public static Class<?> forName(
+ String clsName,
+ @Nullable ClassLoader ldr,
+ Predicate<String> clsFilter
+ ) throws ClassNotFoundException {
+ assert clsName != null;
+
+ Class<?> cls = primitiveMap.get(clsName);
+
+ if (cls != null)
+ return cls;
+
+ if (ldr == null)
+ ldr = igniteClassLoader;
+
+ ConcurrentMap<String, Class> ldrMap = classCache.get(ldr);
+
+ if (ldrMap == null) {
+ ConcurrentMap<String, Class> old = classCache.putIfAbsent(ldr, ldrMap = new ConcurrentHashMap<>());
+
+ if (old != null)
+ ldrMap = old;
+ }
+
+ cls = ldrMap.get(clsName);
+
+ if (cls == null) {
+ if (clsFilter != null && !clsFilter.test(clsName))
+ throw new ClassNotFoundException("Deserialization of class " + clsName + " is disallowed.");
+
+ cls = Class.forName(clsName, true, ldr);
+
+ Class old = ldrMap.putIfAbsent(clsName, cls);
+
+ if (old != null)
+ cls = old;
+ }
+
+ return cls;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
new file mode 100644
index 0000000..5a43f2c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Utility class for tests.
+ */
+public final class IgniteTestUtils {
+ /**
+ * Set object field value via reflection.
+ *
+ * @param obj Object to set field value to.
+ * @param fieldName Field name to set value for.
+ * @param val New field value.
+ * @throws IgniteInternalException In case of error.
+ */
+ public static void setFieldValue(Object obj, String fieldName, Object val) throws IgniteInternalException {
+ assert obj != null;
+ assert fieldName != null;
+
+ try {
+ Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass();
+
+ Field field = cls.getDeclaredField(fieldName);
+
+ boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0;
+
+ boolean isStatic = (field.getModifiers() & Modifier.STATIC) != 0;
+
+ /**
+ * http://java.sun.com/docs/books/jls/third_edition/html/memory.html#17.5.3
+ * If a final field is initialized to a compile-time constant in the field declaration,
+ * changes to the final field may not be observed.
+ */
+ if (isFinal && isStatic)
+ throw new IgniteInternalException("Modification of static final field through reflection.");
+
+ boolean accessible = field.isAccessible();
+
+ if (!accessible)
+ field.setAccessible(true);
+
+ field.set(obj, val);
+ }
+ catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new IgniteInternalException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
+ }
+ }
+
+ /**
+ * Set object field value via reflection.
+ *
+ * @param obj Object to set field value to.
+ * @param cls Class to get field from.
+ * @param fieldName Field name to set value for.
+ * @param val New field value.
+ * @throws IgniteInternalException In case of error.
+ */
+ public static void setFieldValue(Object obj, Class cls, String fieldName, Object val) throws IgniteInternalException {
+ assert fieldName != null;
+
+ try {
+ Field field = cls.getDeclaredField(fieldName);
+
+ boolean accessible = field.isAccessible();
+
+ if (!accessible)
+ field.setAccessible(true);
+
+ boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0;
+
+ boolean isStatic = (field.getModifiers() & Modifier.STATIC) != 0;
+
+ /**
+ * http://java.sun.com/docs/books/jls/third_edition/html/memory.html#17.5.3
+ * If a final field is initialized to a compile-time constant in the field declaration,
+ * changes to the final field may not be observed.
+ */
+ if (isFinal && isStatic)
+ throw new IgniteInternalException("Modification of static final field through reflection.");
+
+ if (isFinal) {
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+
+ modifiersField.setAccessible(true);
+
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ }
+
+ field.set(obj, val);
+ }
+ catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new IgniteInternalException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
+ }
+ }
+}