You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/01/17 15:48:14 UTC
[ignite] branch master updated: IGNITE-10307: SQL: Partition
pruning for joins. This closes #5774.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b03a970 IGNITE-10307: SQL: Partition pruning for joins. This closes #5774.
b03a970 is described below
commit b03a9705b144196bfde631328562008eb9fa48a6
Author: devozerov <vo...@gridgain.com>
AuthorDate: Thu Jan 17 18:47:59 2019 +0300
IGNITE-10307: SQL: Partition pruning for joins. This closes #5774.
---
.../processors/query/h2/IgniteH2Indexing.java | 110 +-
...ode.java => PartitionAffinityFunctionType.java} | 35 +-
.../query/h2/affinity/PartitionAllNode.java | 5 +
.../query/h2/affinity/PartitionCompositeNode.java | 114 +-
.../query/h2/affinity/PartitionConstantNode.java | 6 +-
.../query/h2/affinity/PartitionExtractor.java | 401 ++++--
.../query/h2/affinity/PartitionGroupNode.java | 23 +-
.../query/h2/affinity/PartitionJoinCondition.java | 132 ++
.../query/h2/affinity/PartitionJoinGroup.java | 81 ++
.../query/h2/affinity/PartitionNode.java | 5 +
.../query/h2/affinity/PartitionNoneNode.java | 5 +
.../query/h2/affinity/PartitionParameterNode.java | 2 +-
.../query/h2/affinity/PartitionResult.java | 25 +-
.../query/h2/affinity/PartitionSingleNode.java | 21 +-
.../query/h2/affinity/PartitionTable.java | 113 ++
.../affinity/PartitionTableAffinityDescriptor.java | 97 ++
.../h2/affinity/PartitionTableDescriptor.java | 73 --
.../query/h2/affinity/PartitionTableModel.java | 157 +++
.../processors/query/h2/opt/GridH2Table.java | 161 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 20 +-
.../query/h2/twostep/GridReduceQueryExecutor.java | 5 +-
.../BetweenOperationExtractPartitionSelfTest.java | 18 -
.../h2/twostep/JoinPartitionPruningSelfTest.java | 1303 ++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 2 +
24 files changed, 2591 insertions(+), 323 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0c1b1d6..5e00aea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -92,7 +92,8 @@ import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.query.h2.affinity.PartitionNode;
+import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
+import org.apache.ignite.internal.processors.query.h2.affinity.PartitionResult;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
@@ -139,6 +140,7 @@ import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
@@ -248,6 +250,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private DdlStatementsProcessor ddlProc;
+ /** Partition extractor. */
+ private PartitionExtractor partExtractor;
+
/** */
private final RunningQueryManager runningQueryMgr = new RunningQueryManager();
@@ -2006,46 +2011,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean cursorCreated = false;
try {
- // TODO: Use intersection (https://issues.apache.org/jira/browse/IGNITE-10567)
- int partitions[] = qry.getPartitions();
-
- if (partitions == null && twoStepQry.derivedPartitions() != null) {
- try {
- PartitionNode partTree = twoStepQry.derivedPartitions().tree();
-
- Collection<Integer> partitions0 = partTree.apply(qry.getArgs());
-
- if (F.isEmpty(partitions0))
- partitions = new int[0];
- else {
- partitions = new int[partitions0.size()];
-
- int i = 0;
-
- for (Integer part : partitions0)
- partitions[i++] = part;
- }
-
- if (partitions.length == 0) { //here we know that result of requested query is empty
- return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- return new Iterator<List<?>>() {
- @Override public boolean hasNext() {
- return false;
- }
+ // When explicit partitions are set, there must be an owning cache they should be applied to.
+ int explicitParts[] = qry.getPartitions();
+ PartitionResult derivedParts = twoStepQry.derivedPartitions();
+
+ int parts[] = calculatePartitions(explicitParts, derivedParts, qry.getArgs());
+
+ if (parts != null && parts.length == 0) {
+ return new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return new Iterator<List<?>>() {
+ @Override public boolean hasNext() {
+ return false;
+ }
- @Override public List<?> next() {
- return null;
- }
- };
+ @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
+ @Override public List<?> next() {
+ return null;
}
- });
+ };
}
- }
- catch (IgniteCheckedException e) {
- throw new CacheException("Failed to calculate derived partitions: [qry=" + qry.getSql() +
- ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
- }
+ });
}
Iterable<List<?>> iter = runQueryTwoStep(
@@ -2057,7 +2043,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
qry.getTimeout(),
cancel,
qry.getArgs(),
- partitions,
+ parts,
qry.isLazy(),
mvccTracker
);
@@ -2079,6 +2065,43 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Calculate partitions for the query.
+ *
+ * @param explicitParts Explicit partitions provided in SqlFieldsQuery.partitions property.
+ * @param derivedParts Derived partitions found during partition pruning.
+ * @param args Arguments.
+ * @return Calculated partitions or {@code null} if failed to calculate and there should be a broadcast.
+ */
+ @SuppressWarnings("ZeroLengthArrayAllocation")
+ private int[] calculatePartitions(int[] explicitParts, PartitionResult derivedParts, Object[] args) {
+ if (!F.isEmpty(explicitParts))
+ return explicitParts;
+ else if (derivedParts != null) {
+ try {
+ Collection<Integer> realParts = derivedParts.tree().apply(args);
+
+ if (F.isEmpty(realParts))
+ return IgniteUtils.EMPTY_INTS;
+ else {
+ int[] realParts0 = new int[realParts.size()];
+
+ int i = 0;
+
+ for (Integer realPart : realParts)
+ realParts0[i++] = realPart;
+
+ return realParts0;
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException("Failed to calculate derived partitions for query.", e);
+ }
+ }
+
+ return null;
+ }
+
+ /**
* Do initial parsing of the statement and create query caches, if needed.
* @param c Connection.
* @param sqlQry Query.
@@ -2375,6 +2398,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
dmlProc = new DmlStatementsProcessor(ctx, this);
ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
+ partExtractor = new PartitionExtractor(this);
+
if (JdbcUtils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
@@ -2670,6 +2695,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @return Partition extractor.
+ */
+ public PartitionExtractor partitionExtractor() {
+ return partExtractor;
+ }
+
+ /**
* Collect cache identifiers from two-step query.
*
* @param mainCacheId Id of main cache.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
similarity index 62%
copy from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
copy to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
index 238739c..4c88fcb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
@@ -17,29 +17,32 @@
package org.apache.ignite.internal.processors.query.h2.affinity;
-import org.apache.ignite.IgniteCheckedException;
-
-import java.util.Collection;
-
/**
- * Common node of partition tree.
+ * Affinity function type.
*/
-public interface PartitionNode {
+public enum PartitionAffinityFunctionType {
+ /** Custom affintiy function. */
+ CUSTOM(0),
+
+ /** Rendezvous affinity function. */
+ RENDEZVOUS(1);
+
+ /** Value. */
+ private final int val;
+
/**
- * Get partitions.
+ * Constructor.
*
- * @param args Query arguments.
- * @return Partitions.
- * @throws IgniteCheckedException If failed.
+ * @param val Value.
*/
- Collection<Integer> apply(Object... args) throws IgniteCheckedException;
+ PartitionAffinityFunctionType(int val) {
+ this.val = val;
+ }
/**
- * Try optimizing partition nodes into a simpler form.
- *
- * @return Optimized node or {@code this} if optimization failed.
+ * @return Value.
*/
- default PartitionNode optimize() {
- return this;
+ public int value() {
+ return val;
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
index 842d82c..30860f5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
@@ -41,6 +41,11 @@ public class PartitionAllNode implements PartitionNode {
}
/** {@inheritDoc} */
+ @Override public int joinGroup() {
+ return PartitionTableModel.GRP_NONE;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(PartitionAllNode.class, this);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
index 2cb330f..45ceaaf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.affinity;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import java.util.Collection;
@@ -65,6 +66,8 @@ public class PartitionCompositeNode implements PartitionNode {
return null;
// (A, B) and (B, C) -> (B)
+ leftParts = new HashSet<>(leftParts);
+
leftParts.retainAll(rightParts);
}
else {
@@ -77,6 +80,8 @@ public class PartitionCompositeNode implements PartitionNode {
return leftParts;
// (A, B) or (B, C) -> (A, B, C)
+ leftParts = new HashSet<>(leftParts);
+
leftParts.addAll(rightParts);
}
@@ -84,6 +89,12 @@ public class PartitionCompositeNode implements PartitionNode {
}
/** {@inheritDoc} */
+ @Override public int joinGroup() {
+ // Similar to group node, we cannot cache join group value here as it may be changed dynamically.
+ return left.joinGroup();
+ }
+
+ /** {@inheritDoc} */
@Override public PartitionNode optimize() {
PartitionNode left = this.left;
PartitionNode right = this.right;
@@ -103,9 +114,15 @@ public class PartitionCompositeNode implements PartitionNode {
return optimizeSpecial(right, left);
// If one of child nodes cannot be optimized, nothing can be done further.
- // Note that we cannot return "this" here because left or right parts might have been optimized.
- if (left instanceof PartitionCompositeNode || right instanceof PartitionCompositeNode)
+ // Note that we cannot return "this" here because left or right parts might have been changed.
+ if (left instanceof PartitionCompositeNode || right instanceof PartitionCompositeNode) {
+ // Should be "NONE" for AND in fact, but this would violate current non-collocated join semantics as
+ // explained in "optimizeSimpleAnd" method below.
+ if (left.joinGroup() != right.joinGroup())
+ return PartitionAllNode.INSTANCE;
+
return new PartitionCompositeNode(left, right, op);
+ }
// Try optimizing composite nodes.
if (left instanceof PartitionGroupNode)
@@ -182,6 +199,11 @@ public class PartitionCompositeNode implements PartitionNode {
private PartitionNode optimizeGroupAnd(PartitionGroupNode left, PartitionNode right) {
assert op == PartitionCompositeNodeOperator.AND;
+ // Should be "NONE" for AND in fact, but this would violate current non-collocated join semantics as
+ // explained in "optimizeSimpleAnd" method below.
+ if (left.joinGroup() != right.joinGroup())
+ return PartitionAllNode.INSTANCE;
+
// Optimistic check whether both sides are equal.
if (right instanceof PartitionGroupNode) {
PartitionGroupNode right0 = (PartitionGroupNode)right;
@@ -206,22 +228,50 @@ public class PartitionCompositeNode implements PartitionNode {
}
if (rightConsts != null) {
- // {A, B) and (B, C) -> (B).
- consts.retainAll(rightConsts);
-
- if (consts.isEmpty())
- // {A, B) and (C, D) -> NONE.
- return PartitionNoneNode.INSTANCE;
- else if (consts.size() == 1)
+ // Try to merge nodes if they belong to the same table.
+ boolean sameTbl = true;
+ String curTblAlias = null;
+
+ for (PartitionSingleNode curConst : consts) {
+ if (curTblAlias == null)
+ curTblAlias = curConst.table().alias();
+ else if (!F.eq(curTblAlias, curConst.table().alias())) {
+ sameTbl = false;
+
+ break;
+ }
+ }
+
+ if (sameTbl) {
+ for (PartitionSingleNode curConst : rightConsts) {
+ if (curTblAlias == null)
+ curTblAlias = curConst.table().alias();
+ else if (!F.eq(curTblAlias, curConst.table().alias())) {
+ sameTbl = false;
+
+ break;
+ }
+ }
+ }
+
+ if (sameTbl) {
// {A, B) and (B, C) -> (B).
- return consts.iterator().next();
- else
- // {A, B, C) and (B, C, D) -> (B, C).
- return new PartitionGroupNode(consts);
+ consts.retainAll(rightConsts);
+
+ if (consts.isEmpty())
+ // {A, B) and (C, D) -> NONE.
+ return PartitionNoneNode.INSTANCE;
+ else if (consts.size() == 1)
+ // {A, B) and (B, C) -> (B).
+ return consts.iterator().next();
+ else
+ // {A, B, C) and (B, C, D) -> (B, C).
+ return new PartitionGroupNode(consts);
+ }
}
}
- // Otherwise it is a mixed set of concrete partitions and arguments. Cancel optimization.
+ // Otherwise it is a mixed set of concrete partitions and arguments possibly from different caches.
// Note that in fact we can optimize expression to certain extent (e.g. (A) and (B, :C) -> (A) and (:C)),
// but resulting expression is always composite node still, which cannot be optimized on upper levels.
// So we skip any fine-grained optimization in favor of simplicity.
@@ -238,6 +288,10 @@ public class PartitionCompositeNode implements PartitionNode {
private PartitionNode optimizeGroupOr(PartitionGroupNode left, PartitionNode right) {
assert op == PartitionCompositeNodeOperator.OR;
+ // Cannot merge disjunctive nodes if they belong to different join groups.
+ if (left.joinGroup() != right.joinGroup())
+ return PartitionAllNode.INSTANCE;
+
HashSet<PartitionSingleNode> siblings = new HashSet<>(left.siblings());
if (right instanceof PartitionSingleNode)
@@ -278,18 +332,28 @@ public class PartitionCompositeNode implements PartitionNode {
private PartitionNode optimizeSimpleAnd(PartitionSingleNode left, PartitionSingleNode right) {
assert op == PartitionCompositeNodeOperator.AND;
+ // Currently we do not merge such nodes because it may violate existing broken (!!!) join semantics.
+ // Normally, if we have two non-collocated partition sets, then this should be an empty set for collocated
+ // query mode. Unfortunately, current semantics of collocated query mode assume that even though both sides
+ // of expression are located on random nodes, there is a slight chance that they may accidentally reside on
+ // a single node and hence return some rows. We return "ALL" here to keep this broken semantics consistent
+ // irrespective of whether partition pruning is used or not. Once non-collocated joins are fixed, this
+ // condition will be changed to "NONE".
+ if (left.joinGroup() != right.joinGroup())
+ return PartitionAllNode.INSTANCE;
+
// Check if both sides are equal.
if (left.equals(right))
// (X) and (X) -> X
// (:X) and (:X) -> :X
return left;
- // If both sides are constants, and they are not equal, this is empty set.
- if (left.constant() && right.constant())
+ // If both sides are constants from the same table and they are not equal, this is empty set.
+ if (left.constant() && right.constant() && F.eq(left.table().alias(), right.tbl.alias()))
// X and Y -> NONE
return PartitionNoneNode.INSTANCE;
- // Otherwise it is a mixed set, cannot reduce.
+ // Otherwise this is a mixed set, cannot reduce.
// X and :Y -> (X) AND (:Y)
return new PartitionCompositeNode(left, right, PartitionCompositeNodeOperator.AND);
}
@@ -304,7 +368,21 @@ public class PartitionCompositeNode implements PartitionNode {
private PartitionNode optimizeSimpleOr(PartitionSingleNode left, PartitionSingleNode right) {
assert op == PartitionCompositeNodeOperator.OR;
- return left.equals(right) ? left : PartitionGroupNode.merge(left, right);
+ // Cannot merge disjunctive nodes if they belong to different join groups.
+ if (left.joinGroup() != right.joinGroup())
+ return PartitionAllNode.INSTANCE;
+
+ // (A) or (A) -> (A)
+ if (left.equals(right))
+ return left;
+
+ // (A) or (B) -> (A, B)
+ HashSet<PartitionSingleNode> nodes = new HashSet<>();
+
+ nodes.add(left);
+ nodes.add(right);
+
+ return new PartitionGroupNode(nodes);
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
index 9efafe4..9e258ae 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
@@ -29,11 +29,11 @@ public class PartitionConstantNode extends PartitionSingleNode {
/**
* Constructor.
*
- * @param resolver Resolver.
+ * @param tbl Table.
* @param part Partition.
*/
- public PartitionConstantNode(PartitionTableDescriptor resolver, int part) {
- super(resolver);
+ public PartitionConstantNode(PartitionTable tbl, int part) {
+ super(tbl);
this.part = part;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
index 12549ce..0898e63 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
@@ -21,8 +21,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
@@ -30,6 +34,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter;
@@ -41,6 +46,9 @@ import org.h2.table.Column;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+
/**
* Partition tree extractor.
*/
@@ -84,14 +92,11 @@ public class PartitionExtractor {
GridSqlSelect select = (GridSqlSelect)qry;
- // Currently we can extract data only from a single table.
- GridSqlTable tbl = unwrapTable(select.from());
-
- if (tbl == null)
- return null;
+ // Prepare table model.
+ PartitionTableModel tblModel = prepareTableModel(select.from());
// Do extract.
- PartitionNode tree = extractFromExpression(select.where());
+ PartitionNode tree = extractFromExpression(select.where(), tblModel, false);
assert tree != null;
@@ -101,10 +106,8 @@ public class PartitionExtractor {
if (tree instanceof PartitionAllNode)
return null;
- // Return.
- PartitionTableDescriptor desc = descriptor(tbl.dataTable());
-
- return new PartitionResult(desc, tree);
+ // Done.
+ return new PartitionResult(tree, tblModel.joinGroupAffinity(tree.joinGroup()));
}
/**
@@ -114,21 +117,25 @@ public class PartitionExtractor {
* @return Partition result or {@code null} if nothing is resolved.
*/
@SuppressWarnings("IfMayBeConditional")
- public PartitionResult merge(List<GridCacheSqlQuery> qrys) {
+ public PartitionResult mergeMapQueries(List<GridCacheSqlQuery> qrys) {
// Check if merge is possible.
- PartitionTableDescriptor desc = null;
+ PartitionTableAffinityDescriptor aff = null;
for (GridCacheSqlQuery qry : qrys) {
PartitionResult qryRes = (PartitionResult)qry.derivedPartitions();
+ // Failed to get results for one query -> broadcast.
if (qryRes == null)
- // Failed to get results for one query -> broadcast.
return null;
- if (desc == null)
- desc = qryRes.descriptor();
- else if (!F.eq(desc, qryRes.descriptor()))
- // Queries refer to different tables, cannot merge -> broadcast.
+ // This only possible if query is resolved to "NONE". Will be skipped later during map request prepare.
+ if (qryRes.affinity() == null)
+ continue;
+
+ if (aff == null)
+ aff = qryRes.affinity();
+ else if (!aff.isCompatible(qryRes.affinity()))
+ // Queries refer to incompatible affinity groups, cannot merge -> broadcast.
return null;
}
@@ -152,33 +159,232 @@ public class PartitionExtractor {
if (tree instanceof PartitionAllNode)
return null;
- return new PartitionResult(desc, tree);
+ // If there is no affinity, then we assume "NONE" result.
+ assert aff != null || tree == PartitionNoneNode.INSTANCE;
+
+ return new PartitionResult(tree, aff);
}
/**
- * Try unwrapping the table.
+ * Prepare table model.
*
- * @param from From.
- * @return Table or {@code null} if not a table.
+ * @param from FROM clause.
+ * @return Join model.
*/
- @Nullable private static GridSqlTable unwrapTable(GridSqlAst from) {
- if (from instanceof GridSqlAlias)
- from = from.child();
+ private PartitionTableModel prepareTableModel(GridSqlAst from) {
+ PartitionTableModel res = new PartitionTableModel();
+
+ prepareTableModel0(from, res);
- if (from instanceof GridSqlTable)
- return (GridSqlTable)from;
+ return res;
+ }
+
+ /**
+ * Prepare tables which will be used in join model.
+ *
+ * @param from From flag.
+ * @param model Table model.
+ * @return {@code True} if extracted tables successfully, {@code false} if failed to extract.
+ */
+ private List<PartitionTable> prepareTableModel0(GridSqlAst from, PartitionTableModel model) {
+ if (from instanceof GridSqlJoin) {
+ // Process JOIN recursively.
+ GridSqlJoin join = (GridSqlJoin)from;
+
+ List<PartitionTable> leftTbls = prepareTableModel0(join.leftTable(), model);
+ List<PartitionTable> rightTbls = prepareTableModel0(join.rightTable(), model);
+
+ if (join.isLeftOuter()) {
+ // "a LEFT JOIN b" is transformed into "a", and "b" is put into special stop-list.
+ // If a condition is met on "b" afterwards, we will ignore it.
+ for (PartitionTable rightTbl : rightTbls)
+ model.addExcludedTable(rightTbl.alias());
+
+ return leftTbls;
+ }
+
+ // Extract equi-join or cross-join from condition. For normal INNER JOINs most likely we will have "1=1"
+ // cross join here, real join condition will be found in WHERE clause later.
+ PartitionJoinCondition cond = parseJoinCondition(join.on());
+
+ if (cond != null && !cond.cross())
+ model.addJoin(cond);
+
+ ArrayList<PartitionTable> res = new ArrayList<>(leftTbls.size() + rightTbls.size());
+
+ res.addAll(leftTbls);
+ res.addAll(rightTbls);
+
+ return res;
+ }
+
+ PartitionTable tbl = prepareTable(from, model);
+
+ return tbl != null ? Collections.singletonList(tbl) : Collections.emptyList();
+ }
+
+ /**
+ * Try parsing condition as simple JOIN codition. Only equijoins are supported for now, so anything more complex
+ * than "A.a = B.b" are not processed.
+ *
+ * @param on Initial AST.
+ * @return Join condition or {@code null} if not simple equijoin.
+ */
+ private static PartitionJoinCondition parseJoinCondition(GridSqlElement on) {
+ if (on instanceof GridSqlOperation) {
+ GridSqlOperation on0 = (GridSqlOperation)on;
+
+ if (on0.operationType() == GridSqlOperationType.EQUAL) {
+ // Check for cross-join first.
+ GridSqlConst leftConst = unwrapConst(on0.child(0));
+ GridSqlConst rightConst = unwrapConst(on0.child(1));
+
+ if (leftConst != null && rightConst != null) {
+ try {
+ int leftConstval = leftConst.value().getInt();
+ int rightConstVal = rightConst.value().getInt();
+
+ if (leftConstval == rightConstVal)
+ return PartitionJoinCondition.CROSS;
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+
+ // This is not cross-join, neither normal join between columns.
+ if (leftConst != null || rightConst != null)
+ return null;
+
+ // Check for normal equi-join.
+ GridSqlColumn left = unwrapColumn(on0.child(0));
+ GridSqlColumn right = unwrapColumn(on0.child(1));
+
+ if (left != null && right != null) {
+ String leftAlias = left.tableAlias();
+ String rightAlias = right.tableAlias();
+
+ String leftCol = left.columnName();
+ String rightCol = right.columnName();
+
+ return new PartitionJoinCondition(leftAlias, rightAlias, leftCol, rightCol);
+ }
+ }
+ }
return null;
}
/**
+ * Prepare single table.
+ *
+ * @param from Expression.
+ * @param tblModel Table model.
+ * @return Added table or {@code null} if table is exlcuded from the model.
+ */
+ private static PartitionTable prepareTable(GridSqlAst from, PartitionTableModel tblModel) {
+ // Unwrap alias. We assume that every table must be aliased.
+ assert from instanceof GridSqlAlias;
+
+ String alias = ((GridSqlAlias)from).alias();
+
+ from = from.child();
+
+ if (from instanceof GridSqlTable) {
+ // Normal table.
+ GridSqlTable from0 = (GridSqlTable)from;
+
+ GridH2Table tbl0 = from0.dataTable();
+
+ // Unknown table type, e.g. temp table.
+ if (tbl0 == null) {
+ tblModel.addExcludedTable(alias);
+
+ return null;
+ }
+
+ String cacheName = tbl0.cacheName();
+
+ String affColName = null;
+ String secondAffColName = null;
+
+ for (Column col : tbl0.getColumns()) {
+ if (tbl0.isColumnForPartitionPruningStrict(col)) {
+ if (affColName == null)
+ affColName = col.getName();
+ else {
+ secondAffColName = col.getName();
+
+ // Break as we cannot have more than two affinity key columns.
+ break;
+ }
+ }
+ }
+
+ PartitionTable tbl = new PartitionTable(alias, cacheName, affColName, secondAffColName);
+
+ PartitionTableAffinityDescriptor aff = affinityForCache(tbl0.cacheInfo().config());
+
+ if (aff == null) {
+ // Non-standard affinity, exclude table.
+ tblModel.addExcludedTable(alias);
+
+ return null;
+ }
+
+ tblModel.addTable(tbl, aff);
+
+ return tbl;
+ }
+ else {
+ // Subquery/union/view, etc.
+ assert alias != null;
+
+ tblModel.addExcludedTable(alias);
+
+ return null;
+ }
+ }
+
+ /**
+ * Prepare affinity identifier for cache.
+ *
+ * @param ccfg Cache configuration.
+ * @return Affinity identifier.
+ */
+ private static PartitionTableAffinityDescriptor affinityForCache(CacheConfiguration ccfg) {
+ // Partition could be extracted only from PARTITIONED caches.
+ if (ccfg.getCacheMode() != CacheMode.PARTITIONED)
+ return null;
+
+ PartitionAffinityFunctionType aff = ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class) ?
+ PartitionAffinityFunctionType.RENDEZVOUS : PartitionAffinityFunctionType.CUSTOM;
+
+ boolean hasNodeFilter = ccfg.getNodeFilter() != null &&
+ !(ccfg.getNodeFilter() instanceof CacheConfiguration.IgniteAllNodesPredicate);
+
+ return new PartitionTableAffinityDescriptor(
+ aff,
+ ccfg.getAffinity().partitions(),
+ hasNodeFilter,
+ ccfg.getDataRegionName()
+ );
+ }
+
+ /**
* Extract partitions from expression.
*
* @param expr Expression.
+ * @param tblModel Table model.
+ * @param disjunct Whether current processing frame is located under disjunction ("OR"). In this case we cannot
+ * rely on join expressions like (A.a = B.b) to build co-location model because another conflicting
+ * join expression on the same tables migth be located on the other side of the "OR".
+ * Example: "JOIN ON A.a = B.b OR A.a > B.b".
* @return Partition tree.
*/
@SuppressWarnings("EnumSwitchStatementWhichMissesCases")
- private PartitionNode extractFromExpression(GridSqlAst expr) throws IgniteCheckedException {
+ private PartitionNode extractFromExpression(GridSqlAst expr, PartitionTableModel tblModel, boolean disjunct)
+ throws IgniteCheckedException {
PartitionNode res = PartitionAllNode.INSTANCE;
if (expr instanceof GridSqlOperation) {
@@ -186,22 +392,22 @@ public class PartitionExtractor {
switch (op.operationType()) {
case AND:
- res = extractFromAnd(op);
+ res = extractFromAnd(op, tblModel, disjunct);
break;
case OR:
- res = extractFromOr(op);
+ res = extractFromOr(op, tblModel);
break;
case IN:
- res = extractFromIn(op);
+ res = extractFromIn(op, tblModel);
break;
case EQUAL:
- res = extractFromEqual(op);
+ res = extractFromEqual(op, tblModel, disjunct);
}
}
@@ -213,18 +419,21 @@ public class PartitionExtractor {
* Extract partition information from AND.
*
* @param op Operation.
+ * @param tblModel Table model.
+ * @param disjunct Disjunction marker.
* @return Partition.
*/
- private PartitionNode extractFromAnd(GridSqlOperation op) throws IgniteCheckedException {
+ private PartitionNode extractFromAnd(GridSqlOperation op, PartitionTableModel tblModel, boolean disjunct)
+ throws IgniteCheckedException {
assert op.size() == 2;
- PartitionNode betweenNodes = tryExtractBetween(op);
+ PartitionNode betweenNodes = tryExtractBetween(op, tblModel);
if (betweenNodes != null)
return betweenNodes;
- PartitionNode part1 = extractFromExpression(op.child(0));
- PartitionNode part2 = extractFromExpression(op.child(1));
+ PartitionNode part1 = extractFromExpression(op.child(0), tblModel, disjunct);
+ PartitionNode part2 = extractFromExpression(op.child(1), tblModel, disjunct);
return new PartitionCompositeNode(part1, part2, PartitionCompositeNodeOperator.AND);
}
@@ -233,13 +442,16 @@ public class PartitionExtractor {
* Extract partition information from OR.
*
* @param op Operation.
+ * @param tblModel Table model.
* @return Partition.
*/
- private PartitionNode extractFromOr(GridSqlOperation op) throws IgniteCheckedException {
+ private PartitionNode extractFromOr(GridSqlOperation op, PartitionTableModel tblModel)
+ throws IgniteCheckedException {
assert op.size() == 2;
- PartitionNode part1 = extractFromExpression(op.child(0));
- PartitionNode part2 = extractFromExpression(op.child(1));
+ // Parse inner expressions recursively with disjuncion flag set.
+ PartitionNode part1 = extractFromExpression(op.child(0), tblModel, true);
+ PartitionNode part2 = extractFromExpression(op.child(1), tblModel, true);
return new PartitionCompositeNode(part1, part2, PartitionCompositeNodeOperator.OR);
}
@@ -248,9 +460,11 @@ public class PartitionExtractor {
* Extract partition information from IN.
*
* @param op Operation.
+ * @param tblModel Table model.
* @return Partition.
*/
- private PartitionNode extractFromIn(GridSqlOperation op) throws IgniteCheckedException {
+ private PartitionNode extractFromIn(GridSqlOperation op, PartitionTableModel tblModel)
+ throws IgniteCheckedException {
// Operation should contain at least two children: left (column) and right (const or column).
if (op.size() < 2)
return PartitionAllNode.INSTANCE;
@@ -258,11 +472,9 @@ public class PartitionExtractor {
// Left operand should be column.
GridSqlAst left = op.child();
- GridSqlColumn leftCol;
+ GridSqlColumn leftCol = unwrapColumn(left);
- if (left instanceof GridSqlColumn)
- leftCol = (GridSqlColumn)left;
- else
+ if (leftCol == null)
return PartitionAllNode.INSTANCE;
// Can work only with Ignite tables.
@@ -291,8 +503,8 @@ public class PartitionExtractor {
// set globally. Hence, returning null.
return PartitionAllNode.INSTANCE;
- // Do extract.
- PartitionSingleNode part = extractSingle(leftCol.column(), rightConst, rightParam);
+ // Extract.
+ PartitionSingleNode part = extractSingle(leftCol, rightConst, rightParam, tblModel);
// Same thing as above: single unknown partition in disjunction defeats optimization.
if (part == null)
@@ -308,19 +520,20 @@ public class PartitionExtractor {
* Extract partition information from equality.
*
* @param op Operation.
+ * @param tblModel Table model.
+ * @param disjunct Disjunction flag. When set possible join expression will not be processed.
* @return Partition.
*/
- private PartitionNode extractFromEqual(GridSqlOperation op) throws IgniteCheckedException {
+ private PartitionNode extractFromEqual(GridSqlOperation op, PartitionTableModel tblModel, boolean disjunct)
+ throws IgniteCheckedException {
assert op.operationType() == GridSqlOperationType.EQUAL;
GridSqlElement left = op.child(0);
GridSqlElement right = op.child(1);
- GridSqlColumn leftCol;
+ GridSqlColumn leftCol = unwrapColumn(left);
- if (left instanceof GridSqlColumn)
- leftCol = (GridSqlColumn)left;
- else
+ if (leftCol == null)
return PartitionAllNode.INSTANCE;
if (!(leftCol.column().getTable() instanceof GridH2Table))
@@ -337,10 +550,20 @@ public class PartitionExtractor {
rightConst = null;
rightParam = (GridSqlParameter)right;
}
- else
+ else {
+ if (right instanceof GridSqlColumn) {
+ if (!disjunct) {
+ PartitionJoinCondition cond = parseJoinCondition(op);
+
+ if (cond != null && !cond.cross())
+ tblModel.addJoin(cond);
+ }
+
+ }
return PartitionAllNode.INSTANCE;
+ }
- PartitionSingleNode part = extractSingle(leftCol.column(), rightConst, rightParam);
+ PartitionSingleNode part = extractSingle(leftCol, rightConst, rightParam, tblModel);
return part != null ? part : PartitionAllNode.INSTANCE;
}
@@ -351,52 +574,81 @@ public class PartitionExtractor {
* @param leftCol Left column.
* @param rightConst Right constant.
* @param rightParam Right parameter.
+ * @param tblModel Table model.
* @return Partition or {@code null} if failed to extract.
*/
- @Nullable private PartitionSingleNode extractSingle(Column leftCol, GridSqlConst rightConst,
- GridSqlParameter rightParam) throws IgniteCheckedException {
+ @Nullable private PartitionSingleNode extractSingle(
+ GridSqlColumn leftCol,
+ GridSqlConst rightConst,
+ GridSqlParameter rightParam,
+ PartitionTableModel tblModel
+ ) throws IgniteCheckedException {
assert leftCol != null;
- assert leftCol.getTable() != null;
- assert leftCol.getTable() instanceof GridH2Table;
- GridH2Table tbl = (GridH2Table)leftCol.getTable();
+ Column leftCol0 = leftCol.column();
- if (!tbl.isColumnForPartitionPruning(leftCol))
+ assert leftCol0.getTable() != null;
+ assert leftCol0.getTable() instanceof GridH2Table;
+
+ GridH2Table tbl = (GridH2Table)leftCol0.getTable();
+
+ if (!tbl.isColumnForPartitionPruning(leftCol0))
return null;
- PartitionTableDescriptor tblDesc = descriptor(tbl);
+ PartitionTable tbl0 = tblModel.table(leftCol.tableAlias());
+
+ // If table is in ignored set, then we cannot use it for partition extraction.
+ if (tbl0 == null)
+ return null;
if (rightConst != null) {
- int part = idx.kernalContext().affinity().partition(tbl.cacheName(), rightConst.value().getObject());
+ Object constVal = H2Utils.convert(rightConst.value().getObject(), idx, leftCol0.getType());
+
+ int part = idx.kernalContext().affinity().partition(tbl.cacheName(), constVal);
- return new PartitionConstantNode(tblDesc, part);
+ return new PartitionConstantNode(tbl0, part);
}
else if (rightParam != null)
- return new PartitionParameterNode(tblDesc, idx, rightParam.index(), leftCol.getType());
+ return new PartitionParameterNode(tbl0, idx, rightParam.index(), leftCol0.getType());
else
return null;
}
/**
- * Get descriptor from table.
+ * Unwrap constant if possible.
+ *
+ * @param ast AST.
+ * @return Constant or {@code null} if not a constant.
+ */
+ @Nullable public static GridSqlConst unwrapConst(GridSqlAst ast) {
+ return ast instanceof GridSqlConst ? (GridSqlConst)ast : null;
+ }
+
+ /**
+ * Unwrap column if possible.
*
- * @param tbl Table.
- * @return Descriptor.
+ * @param ast AST.
+ * @return Column or {@code null} if not a column.
*/
- private static PartitionTableDescriptor descriptor(GridH2Table tbl) {
- return new PartitionTableDescriptor(tbl.cacheName(), tbl.getName());
+ @Nullable public static GridSqlColumn unwrapColumn(GridSqlAst ast) {
+ if (ast instanceof GridSqlAlias)
+ ast = ast.child();
+
+ return ast instanceof GridSqlColumn ? (GridSqlColumn)ast : null;
}
/**
* Try to extract partitions from {@code op} assuming that it's between operation or simple range.
*
* @param op Sql operation.
+ * @param tblModel Table model.
* @return {@code PartitionSingleNode} if operation reduced to one partition,
* {@code PartitionGroupNode} if operation reduced to multiple partitions or null if operation is neither
* between nor simple range. Null also returns if it's not possible to extract partitions from given operation.
* @throws IgniteCheckedException If failed.
*/
- private PartitionNode tryExtractBetween(GridSqlOperation op) throws IgniteCheckedException {
+ private PartitionNode tryExtractBetween(GridSqlOperation op, PartitionTableModel tblModel)
+ throws IgniteCheckedException {
// Between operation (or similar range) should contain exact two children.
assert op.size() == 2;
@@ -487,11 +739,18 @@ public class PartitionExtractor {
Set<PartitionSingleNode> parts = new HashSet<>();
- PartitionTableDescriptor desc = descriptor(tbl);
+ PartitionTable tbl0 = tblModel.table(leftCol.tableAlias());
+
+ // If table is in ignored set, then we cannot use it for partition extraction.
+ if (tbl0 == null)
+ return null;
for (long i = leftLongVal; i <= rightLongVal; i++) {
- parts.add(new PartitionConstantNode(desc,
- idx.kernalContext().affinity().partition((tbl).cacheName(), i)));
+ Object constVal = H2Utils.convert(i, idx, leftCol.column().getType());
+
+ int part = idx.kernalContext().affinity().partition(tbl0.cacheName(), constVal);
+
+ parts.add(new PartitionConstantNode(tbl0, part));
if (parts.size() > maxPartsCntBetween)
return null;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
index ef3a154..3d66439 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
@@ -35,22 +35,6 @@ public class PartitionGroupNode implements PartitionNode {
private final Set<PartitionSingleNode> siblings;
/**
- * Merge two simple nodes.
- *
- * @param node1 Node 1.
- * @param node2 Node 2.
- * @return Group node.
- */
- public static PartitionGroupNode merge(PartitionSingleNode node1, PartitionSingleNode node2) {
- HashSet<PartitionSingleNode> nodes = new HashSet<>();
-
- nodes.add(node1);
- nodes.add(node2);
-
- return new PartitionGroupNode(nodes);
- }
-
- /**
* Constructor.
*
* @param siblings Partitions.
@@ -72,6 +56,13 @@ public class PartitionGroupNode implements PartitionNode {
return res;
}
+ /** {@inheritDoc} */
+ @Override public int joinGroup() {
+ // Note that we cannot cache join group in constructor. We have strong invariant that all siblings always
+ // belongs to the same group. However, number of this group may be changed during expression tree traversing.
+ return siblings.iterator().next().joinGroup();
+ }
+
/**
* @return Siblings
*/
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java
new file mode 100644
index 0000000..244c301
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Join condition.
+ */
+public class PartitionJoinCondition {
+ /** Cross JOIN. */
+ public static final PartitionJoinCondition CROSS = new PartitionJoinCondition(null, null, null, null, true);
+
+ /** Left alias. */
+ private final String leftAlias;
+
+ /** Right alias. */
+ private final String rightAlias;
+
+ /** Left column name. */
+ private final String leftCol;
+
+ /** Right column name. */
+ private final String rightCol;
+
+ /** Whether this is a cross-join. */
+ private final boolean cross;
+
+ /**
+ * Constructor.
+ *
+ * @param leftAlias Left alias.
+ * @param rightAlias Right alias.
+ * @param leftCol Left column name.
+ * @param rightCol Right column name.
+ */
+ public PartitionJoinCondition(String leftAlias, String rightAlias, String leftCol, String rightCol) {
+ this(leftAlias, rightAlias, leftCol, rightCol, false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param leftAlias Left alias.
+ * @param rightAlias Right alias.
+ * @param leftCol Left column name.
+ * @param rightCol Right column name.
+ * @param cross Whether this is a cross-join.
+ */
+ private PartitionJoinCondition(String leftAlias, String rightAlias, String leftCol, String rightCol,
+ boolean cross) {
+ this.leftAlias = leftAlias;
+ this.rightAlias = rightAlias;
+ this.leftCol = leftCol;
+ this.rightCol = rightCol;
+ this.cross = cross;
+ }
+
+ /**
+ * Left alias.
+ */
+ public String leftAlias() {
+ return leftAlias;
+ }
+
+ /**
+ * Right alias.
+ */
+ public String rightAlias() {
+ return rightAlias;
+ }
+
+ /**
+ * @return Left column.
+ */
+ public String leftColumn() {
+ return leftCol;
+ }
+
+ /**
+ * @return Right column.
+ */
+ public String rightColumn() {
+ return rightCol;
+ }
+
+ /**
+ * @return Wheter this is a cross-join.
+ */
+ public boolean cross() {
+ return cross;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = leftAlias.hashCode();
+
+ res = 31 * res + rightAlias.hashCode();
+ res = 31 * res + leftCol.hashCode();
+ res = 31 * res + rightCol.hashCode();
+ res = 31 * res + Boolean.hashCode(cross);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj instanceof PartitionJoinCondition) {
+ PartitionJoinCondition other = (PartitionJoinCondition)obj;
+
+ return F.eq(leftAlias, other.leftAlias) && F.eq(rightAlias, other.rightAlias) &&
+ F.eq(leftCol, other.leftCol) && F.eq(rightCol, other.rightCol) && F.eq(cross, other.cross);
+ }
+
+ return false;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java
new file mode 100644
index 0000000..641d013
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+
+/**
+ * Group of joined tables whose affinity function could be "merged".
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class PartitionJoinGroup {
+ /** Tables within a group. */
+ private final Collection<PartitionTable> tbls = Collections.newSetFromMap(new IdentityHashMap<>());
+
+ /** Affinity function descriptor. */
+ private final PartitionTableAffinityDescriptor affDesc;
+
+ /**
+ * Constructor.
+ *
+ * @param affDesc Affinity function descriptor.
+ */
+ public PartitionJoinGroup(PartitionTableAffinityDescriptor affDesc) {
+ this.affDesc = affDesc;
+ }
+
+ /**
+ * @return Tables in a group.
+ */
+ public Collection<PartitionTable> tables() {
+ return tbls;
+ }
+
+ /**
+ * Add table to the group.
+ *
+ * @param tbl Table.
+ * @return This for chaining.
+ */
+ public PartitionJoinGroup addTable(PartitionTable tbl) {
+ tbls.add(tbl);
+
+ return this;
+ }
+
+ /**
+ * Remove table from the group.
+ *
+ * @param tbl Table.
+ * @return If group is empty after removal.
+ */
+ public boolean removeTable(PartitionTable tbl) {
+ tbls.remove(tbl);
+
+ return tbls.isEmpty();
+ }
+
+ /**
+ * @return Affinity descriptor.
+ */
+ public PartitionTableAffinityDescriptor affinityDescriptor() {
+ return affDesc;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
index 238739c..7372fc2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
@@ -35,6 +35,11 @@ public interface PartitionNode {
Collection<Integer> apply(Object... args) throws IgniteCheckedException;
/**
+ * @return Join group for the given node.
+ */
+ int joinGroup();
+
+ /**
* Try optimizing partition nodes into a simpler form.
*
* @return Optimized node or {@code this} if optimization failed.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
index b3a1358..5d4b324 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
@@ -42,6 +42,11 @@ public class PartitionNoneNode implements PartitionNode {
}
/** {@inheritDoc} */
+ @Override public int joinGroup() {
+ return PartitionTableModel.GRP_NONE;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(PartitionNoneNode.class, this);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
index 0624f2c..e9f4880 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
@@ -45,7 +45,7 @@ public class PartitionParameterNode extends PartitionSingleNode {
* @param idx Parameter index.
* @param dataType Parameter data type.
*/
- public PartitionParameterNode(PartitionTableDescriptor tbl, IgniteH2Indexing indexing, int idx,
+ public PartitionParameterNode(PartitionTable tbl, IgniteH2Indexing indexing, int idx,
int dataType) {
super(tbl);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
index 13e7f87..daa14d3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
@@ -24,37 +24,36 @@ import org.apache.ignite.internal.util.typedef.internal.S;
* Partition extraction result.
*/
public class PartitionResult {
- /** Descriptor. */
- @GridToStringInclude
- private final PartitionTableDescriptor desc;
-
/** Tree. */
@GridToStringInclude
private final PartitionNode tree;
+ /** Affinity function. */
+ private final PartitionTableAffinityDescriptor aff;
+
/**
* Constructor.
*
- * @param desc Descriptor.
* @param tree Tree.
+ * @param aff Affinity function.
*/
- public PartitionResult(PartitionTableDescriptor desc, PartitionNode tree) {
- this.desc = desc;
+ public PartitionResult(PartitionNode tree, PartitionTableAffinityDescriptor aff) {
this.tree = tree;
+ this.aff = aff;
}
/**
- * Descriptor.
+ * Tree.
*/
- public PartitionTableDescriptor descriptor() {
- return desc;
+ public PartitionNode tree() {
+ return tree;
}
/**
- * Tree.
+ * @return Affinity function.
*/
- public PartitionNode tree() {
- return tree;
+ public PartitionTableAffinityDescriptor affinity() {
+ return aff;
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
index caf966c..35e7d30 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
@@ -30,14 +30,14 @@ import java.util.Collections;
public abstract class PartitionSingleNode implements PartitionNode {
/** Table descriptor. */
@GridToStringExclude
- protected final PartitionTableDescriptor tbl;
+ protected final PartitionTable tbl;
/**
* Constructor.
*
* @param tbl Table descriptor.
*/
- protected PartitionSingleNode(PartitionTableDescriptor tbl) {
+ protected PartitionSingleNode(PartitionTable tbl) {
this.tbl = tbl;
}
@@ -59,17 +59,29 @@ public abstract class PartitionSingleNode implements PartitionNode {
*/
public abstract boolean constant();
+ /** {@inheritDoc} */
+ @Override public int joinGroup() {
+ return tbl.joinGroup();
+ }
+
/**
* @return Partition for constant node, index for argument node.
*/
public abstract int value();
+ /**
+ * @return Underlying table.
+ */
+ public PartitionTable table() {
+ return tbl;
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
int hash = (constant() ? 1 : 0);
hash = 31 * hash + value();
- hash = 31 * hash + tbl.hashCode();
+ hash = 31 * hash + tbl.alias().hashCode();
return hash;
}
@@ -84,6 +96,7 @@ public abstract class PartitionSingleNode implements PartitionNode {
PartitionSingleNode other = (PartitionSingleNode)obj;
- return F.eq(constant(), other.constant()) && F.eq(value(), other.value()) && F.eq(tbl, other.tbl);
+ return F.eq(constant(), other.constant()) && F.eq(value(), other.value()) &&
+ F.eq(tbl.alias(), other.tbl.alias());
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java
new file mode 100644
index 0000000..1b996c1
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Single table with affinity info.
+ */
+public class PartitionTable {
+ /** Alias used in the query. */
+ private final String alias;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Affinity column name (if can be resolved). */
+ private final String affColName;
+
+ /** Second affinity column name (possible when _KEY is affinity column and an alias for this column exists. */
+ private final String secondAffColName;
+
+ /** Join group index. */
+ private int joinGrp;
+
+ /**
+ * Constructor.
+ *
+ * @param alias Unique alias.
+ * @param cacheName Cache name.
+ * @param affColName Affinity column name.
+ * @param secondAffColName Second affinity column name.
+ */
+ public PartitionTable(
+ String alias,
+ String cacheName,
+ @Nullable String affColName,
+ @Nullable String secondAffColName
+ ) {
+ this.alias = alias;
+ this.cacheName = cacheName;
+
+ if (affColName == null && secondAffColName != null) {
+ this.affColName = secondAffColName;
+ this.secondAffColName = null;
+ }
+ else {
+ this.affColName = affColName;
+ this.secondAffColName = secondAffColName;
+ }
+ }
+
+ /**
+ * @return Alias.
+ */
+ public String alias() {
+ return alias;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /**
+ * Check whether passed column is affinity column.
+ *
+ * @param colName Column name.
+ * @return {@code True} if affinity column.
+ */
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public boolean isAffinityColumn(String colName) {
+ return F.eq(colName, affColName) || F.eq(colName, secondAffColName);
+ }
+
+ /**
+ * @return Join group index.
+ */
+ public int joinGroup() {
+ return joinGrp;
+ }
+
+ /**
+ * @param joinGrp Join group index.
+ */
+ public void joinGroup(int joinGrp) {
+ this.joinGrp = joinGrp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PartitionTable.class, this);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java
new file mode 100644
index 0000000..21dab9c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+
+/**
+ * Affinity function descriptor. Used to compare affinity functions of two tables.
+ */
+public class PartitionTableAffinityDescriptor implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Affinity function type. */
+ private final PartitionAffinityFunctionType affFunc;
+
+ /** Number of partitions. */
+ private final int parts;
+
+ /** Whether node filter is set. */
+ private final boolean hasNodeFilter;
+
+ /** Data region name. */
+ private final String dataRegion;
+
+ /**
+ * Constructor.
+ *
+ * @param affFunc Affinity function type.
+ * @param parts Number of partitions.
+ * @param hasNodeFilter Whether node filter is set.
+ * @param dataRegion Data region.
+ */
+ public PartitionTableAffinityDescriptor(
+ PartitionAffinityFunctionType affFunc,
+ int parts,
+ boolean hasNodeFilter,
+ String dataRegion
+ ) {
+ this.affFunc = affFunc;
+ this.parts = parts;
+ this.hasNodeFilter = hasNodeFilter;
+ this.dataRegion = dataRegion;
+ }
+
+ /**
+ * Check is provided descriptor is compatible with this instance (i.e. can be used in the same co-location group).
+ *
+ * @param other Other descriptor.
+ * @return {@code True} if compatible.
+ */
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public boolean isCompatible(PartitionTableAffinityDescriptor other) {
+ if (other == null)
+ return false;
+
+ // Rendezvous affinity function is deterministic and doesn't depend on previous cluster view changes.
+ // In future other user affinity functions would be applicable as well if explicityl marked deterministic.
+ if (affFunc == PartitionAffinityFunctionType.RENDEZVOUS) {
+ // We cannot be sure that two caches are co-located if custom node filter is present.
+ // Nota that technically we may try to compare two filters. However, this adds unnecessary complexity
+ // and potential deserialization issues when SQL is called from client nodes or thin clients.
+ if (!hasNodeFilter) {
+ return
+ other.affFunc == PartitionAffinityFunctionType.RENDEZVOUS &&
+ !other.hasNodeFilter &&
+ other.parts == parts &&
+ F.eq(other.dataRegion, dataRegion);
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PartitionTableAffinityDescriptor.class, this);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java
deleted file mode 100644
index b11e07e..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.affinity;
-
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Partition resolver.
- */
-public class PartitionTableDescriptor {
- /** Cache name. */
- private final String cacheName;
-
- /** Table name. */
- private final String tblName;
-
- /**
- * Constructor.
- *
- * @param cacheName Cache name.
- * @param tblName Table name.
- */
- public PartitionTableDescriptor(String cacheName, String tblName) {
- this.cacheName = cacheName;
- this.tblName = tblName;
- }
-
- /**
- * @return Cache name.
- */
- public String cacheName() {
- return cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return 31 * cacheName.hashCode() + tblName.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (o == this)
- return true;
-
- if (o.getClass() != getClass())
- return false;
-
- PartitionTableDescriptor other = (PartitionTableDescriptor)o;
-
- return F.eq(cacheName, other.cacheName) && F.eq(tblName, other.tblName);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PartitionTableDescriptor.class, this);
- }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java
new file mode 100644
index 0000000..6393941
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Partition join model. Describes how tables are joined with each other.
+ */
+public class PartitionTableModel {
+ /** Join group which could not be applied (e.g. for "ALL" case). */
+ public static final int GRP_NONE = -1;
+
+ /** All tables observed during parsing excluding outer. */
+ private final Map<String, PartitionTable> tbls = new HashMap<>();
+
+ /** Join groups. */
+ private final Map<Integer, PartitionJoinGroup> grps = new HashMap<>();
+
+ /** Talbes which are excluded from partition pruning calculation. */
+ private Set<String> excludedTblNames;
+
+ /** Group index generator */
+ private int grpIdxGen;
+
+ /**
+ * Add table.
+ *
+ * @param tbl Table.
+ * @param aff Affinity descriptor.
+ */
+ public void addTable(PartitionTable tbl, PartitionTableAffinityDescriptor aff) {
+ int grpIdx = grpIdxGen++;
+
+ tbl.joinGroup(grpIdx);
+
+ tbls.put(tbl.alias(), tbl);
+ grps.put(grpIdx, new PartitionJoinGroup(aff).addTable(tbl));
+ }
+
+ /**
+ * Get table by alias.
+ *
+ * @param alias Alias.
+ * @return Table or {@code null} if it cannot be used for partition pruning.
+ */
+ @Nullable public PartitionTable table(String alias) {
+ PartitionTable res = tbls.get(alias);
+
+ assert res != null || (excludedTblNames != null && excludedTblNames.contains(alias));
+
+ return res;
+ }
+
+ /**
+ * Add excluded table
+ *
+ * @param alias Alias.
+ */
+ public void addExcludedTable(String alias) {
+ PartitionTable tbl = tbls.remove(alias);
+
+ if (tbl != null) {
+ PartitionJoinGroup grp = grps.get(tbl.joinGroup());
+
+ assert grp != null;
+
+ if (grp.removeTable(tbl))
+ grps.remove(tbl.joinGroup());
+ }
+
+ if (excludedTblNames == null)
+ excludedTblNames = new HashSet<>();
+
+ excludedTblNames.add(alias);
+ }
+
+ /**
+ * Add equi-join condition. Two joined tables may possibly be merged into a single group.
+ *
+ * @param cond Condition.
+ */
+ public void addJoin(PartitionJoinCondition cond) {
+ PartitionTable leftTbl = tbls.get(cond.leftAlias());
+ PartitionTable rightTbl = tbls.get(cond.rightAlias());
+
+ assert leftTbl != null || (excludedTblNames != null && excludedTblNames.contains(cond.leftAlias()));
+ assert rightTbl != null || (excludedTblNames != null && excludedTblNames.contains(cond.rightAlias()));
+
+ // At least one tables is excluded, return.
+ if (leftTbl == null || rightTbl == null)
+ return;
+
+ // At least one column in condition is not affinity column, return.
+ if (!leftTbl.isAffinityColumn(cond.leftColumn()) || !rightTbl.isAffinityColumn(cond.rightColumn()))
+ return;
+
+ // Remember join group of the right table as it will be changed below.
+ int rightGrpId = rightTbl.joinGroup();
+
+ PartitionJoinGroup leftGrp = grps.get(leftTbl.joinGroup());
+ PartitionJoinGroup rightGrp = grps.get(rightGrpId);
+
+ assert leftGrp != null;
+ assert rightGrp != null;
+
+ // Groups are not compatible, return.
+ if (!leftGrp.affinityDescriptor().isCompatible(rightGrp.affinityDescriptor()))
+ return;
+
+ // Safe to merge groups.
+ for (PartitionTable tbl : rightGrp.tables()) {
+ tbl.joinGroup(leftTbl.joinGroup());
+
+ leftGrp.addTable(tbl);
+ }
+
+ grps.remove(rightGrpId);
+ }
+
+ /**
+ * Get affinity descriptor for the group.
+ *
+ * @param grpId Group ID.
+ * @return Affinity descriptor or {@code null} if there is no affinity descriptor (e.g. for "NONE" result).
+ */
+ @Nullable public PartitionTableAffinityDescriptor joinGroupAffinity(int grpId) {
+ if (grpId == GRP_NONE)
+ return null;
+
+ PartitionJoinGroup grp = grps.get(grpId);
+
+ assert grp != null;
+
+ return grp.affinityDescriptor();
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index b69a011..fff12e3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -62,7 +62,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
/**
* H2 Table implementation.
@@ -101,8 +100,8 @@ public class GridH2Table extends TableBase {
/** */
private final IndexColumn affKeyCol;
- /** */
- private final int affKeyColId;
+ /** Whether affinity key column is the whole cache key. */
+ private final boolean affKeyColIsKey;
/** */
private final LongAdder size = new LongAdder();
@@ -122,6 +121,9 @@ public class GridH2Table extends TableBase {
/** Flag remove index or not when table will be destroyed. */
private volatile boolean rmIndex;
+ /** Columns with thread-safe access. */
+ private volatile Column[] safeColumns;
+
/**
* Creates table.
*
@@ -141,36 +143,8 @@ public class GridH2Table extends TableBase {
this.desc = desc;
this.cacheInfo = cacheInfo;
- if (!desc.type().customAffinityKeyMapper()) {
- String affKeyFieldName = desc.type().affinityKey();
-
- if (affKeyFieldName != null) {
- if (doesColumnExist(affKeyFieldName)) {
- int colId = getColumn(affKeyFieldName).getColumnId();
-
- if (desc.isKeyColumn(colId)) {
- affKeyCol = indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING);
- affKeyColId = GridH2KeyValueRowOnheap.KEY_COL;
- }
- else {
- affKeyCol = indexColumn(colId, SortOrder.ASCENDING);
- affKeyColId = colId;
- }
- }
- else {
- affKeyCol = null;
- affKeyColId = COL_NOT_EXISTS;
- }
- }
- else {
- affKeyCol = indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING);
- affKeyColId = GridH2KeyValueRowOnheap.KEY_COL;
- }
- }
- else {
- affKeyCol = null;
- affKeyColId = COL_NOT_EXISTS;
- }
+ affKeyCol = calculateAffinityKeyColumn();
+ affKeyColIsKey = affKeyCol != null && desc.isKeyColumn(affKeyCol.column.getColumnId());
this.rowFactory = rowFactory;
@@ -210,6 +184,36 @@ public class GridH2Table extends TableBase {
}
/**
+ * Calculate affinity key column which will be used for partition pruning and distributed joins.
+ *
+ * @return Affinity column or {@code null} if none can be used.
+ */
+ private IndexColumn calculateAffinityKeyColumn() {
+ // If custome affinity key mapper is set, we do not know how to convert _KEY to partition, return null.
+ if (desc.type().customAffinityKeyMapper())
+ return null;
+
+ String affKeyFieldName = desc.type().affinityKey();
+
+ // If explicit affinity key field is not set, then use _KEY.
+ if (affKeyFieldName == null)
+ return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING);
+
+ // If explicit affinity key field is set, but is not found in the table, do not use anything.
+ if (!doesColumnExist(affKeyFieldName))
+ return null;
+
+ int colId = getColumn(affKeyFieldName).getColumnId();
+
+ // If affinity key column is either _KEY or it's alias (QueryEntity.keyFieldName), normalize it to _KEY.
+ if (desc.isKeyColumn(colId))
+ return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING);
+
+ // Otherwise use column as is.
+ return indexColumn(colId, SortOrder.ASCENDING);
+ }
+
+ /**
* @return {@code true} If this is a partitioned table.
*/
public boolean isPartitioned() {
@@ -230,9 +234,65 @@ public class GridH2Table extends TableBase {
* @return {@code True} if affinity key column.
*/
public boolean isColumnForPartitionPruning(Column col) {
+ return isColumnForPartitionPruning0(col, false);
+ }
+
+ /**
+ * Check whether passed column could be used for partition transfer during partition pruning on joined tables and
+ * for external affinity calculation (e.g. on thin clients).
+ * <p>
+ * Note that it is different from {@link #isColumnForPartitionPruning(Column)} method in that not every column
+ * which qualifies for partition pruning can be used by thin clients or join partinion prunining logic.
+ * <p>
+ * Consider the following schema:
+ * <pre>
+ * CREATE TABLE dept (id PRIMARY KEY);
+ * CREATE TABLE emp (id, dept_id AFFINITY KEY, PRIMARY KEY(id, dept_id));
+ * </pre>
+ * For expression-based partition pruning on "emp" table on the <b>server side</b> we may use both "_KEY" and
+ * "dept_id" columns, as passing them through standard affinity workflow will yield the same result:
+ * dept_id -> part
+ * _KEY -> dept_id -> part
+ * <p>
+ * But we cannot use "_KEY" on thin client side, as it doesn't know how to extract affinity key field properly.
+ * Neither we can perform partition transfer in JOINs when "_KEY" is used.
+ * <p>
+ * This is OK as data is collocated, so we can merge partitions extracted from both tables:
+ * <pre>
+ * SELECT * FROM dept d INNER JOIN emp e ON d.id = e.dept_id WHERE e.dept_id=? AND d.id=?
+ * </pre>
+ * But this is not OK as joined data is not collocated, and tables form distinct collocation groups:
+ * <pre>
+ * SELECT * FROM dept d INNER JOIN emp e ON d.id = e._KEY WHERE e.dept_id=? AND d.id=?
+ * </pre>
+ * NB: The last query is not logically correct and will produce empty result. However, it is correct from SQL
+ * perspective, so we should make incorrect assumptions about partitions as it may make situation even worse.
+ *
+ * @param col Column.
+ * @return {@code True} if column could be used for partition extraction on both server and client sides and for
+ * partition transfer in joins.
+ */
+ public boolean isColumnForPartitionPruningStrict(Column col) {
+ return isColumnForPartitionPruning0(col, true);
+ }
+
+ /**
+ * Internal logic to check whether column qualifies for partition extraction or not.
+ *
+ * @param col Column.
+ * @param strict Strict flag.
+ * @return {@code True} if column could be used for partition.
+ */
+ private boolean isColumnForPartitionPruning0(Column col, boolean strict) {
+ if (affKeyCol == null)
+ return false;
+
int colId = col.getColumnId();
- return colId == affKeyColId || desc.isKeyColumn(colId);
+ if (colId == affKeyCol.column.getColumnId())
+ return true;
+
+ return (affKeyColIsKey || !strict) && desc.isKeyColumn(colId);
}
/**
@@ -333,6 +393,7 @@ public class GridH2Table extends TableBase {
*
* @param exclusive Exclusive flag.
*/
+ @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "CallToThreadYield"})
private void lock(boolean exclusive) {
Lock l = exclusive ? lock.writeLock() : lock.readLock();
@@ -980,12 +1041,14 @@ public class GridH2Table extends TableBase {
lock(true);
try {
- int pos = columns.length;
+ Column[] safeColumns0 = safeColumns;
+
+ int pos = safeColumns0.length;
- Column[] newCols = new Column[columns.length + cols.size()];
+ Column[] newCols = new Column[safeColumns0.length + cols.size()];
// First, let's copy existing columns to new array
- System.arraycopy(columns, 0, newCols, 0, columns.length);
+ System.arraycopy(safeColumns0, 0, newCols, 0, safeColumns0.length);
// And now, let's add new columns
for (QueryField col : cols) {
@@ -1026,13 +1089,16 @@ public class GridH2Table extends TableBase {
* @param cols Columns.
* @param ifExists If EXISTS flag.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public void dropColumns(List<String> cols, boolean ifExists) {
assert !ifExists || cols.size() == 1;
lock(true);
try {
- int size = columns.length;
+ Column[] safeColumns0 = safeColumns;
+
+ int size = safeColumns0.length;
for (String name : cols) {
if (!doesColumnExist(name)) {
@@ -1052,8 +1118,8 @@ public class GridH2Table extends TableBase {
int dst = 0;
- for (int i = 0; i < columns.length; i++) {
- Column column = columns[i];
+ for (int i = 0; i < safeColumns0.length; i++) {
+ Column column = safeColumns0[i];
for (String name : cols) {
if (F.eq(name, column.getName())) {
@@ -1084,7 +1150,16 @@ public class GridH2Table extends TableBase {
}
/** {@inheritDoc} */
+ @Override protected void setColumns(Column[] columns) {
+ this.safeColumns = columns;
+
+ super.setColumns(columns);
+ }
+
+ /** {@inheritDoc} */
@Override public Column[] getColumns() {
+ Column[] safeColumns0 = safeColumns;
+
Boolean insertHack = INSERT_HACK.get();
if (insertHack != null && insertHack) {
@@ -1093,15 +1168,15 @@ public class GridH2Table extends TableBase {
StackTraceElement elem = elems[2];
if (F.eq(elem.getClassName(), Insert.class.getName()) && F.eq(elem.getMethodName(), "prepare")) {
- Column[] columns0 = new Column[columns.length - 3];
+ Column[] columns0 = new Column[safeColumns0.length - 3];
- System.arraycopy(columns, 3, columns0, 0, columns0.length);
+ System.arraycopy(safeColumns0, 3, columns0, 0, columns0.length);
return columns0;
}
}
- return columns;
+ return safeColumns0;
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index fcb3424..818777c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -129,6 +129,9 @@ public class GridSqlQuerySplitter {
private boolean collocatedGrpBy;
/** */
+ private boolean distributedJoins;
+
+ /** */
private IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new IdentityHashMap<>();
/** Partition extractor. */
@@ -137,13 +140,15 @@ public class GridSqlQuerySplitter {
/**
* @param params Query parameters.
* @param collocatedGrpBy If it is a collocated GROUP BY query.
- * @param idx Indexing.
+ * @param distributedJoins Distributed joins flag.
+ * @param extractor Partition extractor.
*/
- public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, IgniteH2Indexing idx) {
+ public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, boolean distributedJoins,
+ PartitionExtractor extractor) {
this.params = params;
this.collocatedGrpBy = collocatedGrpBy;
-
- extractor = new PartitionExtractor(idx);
+ this.distributedJoins = distributedJoins;
+ this.extractor = extractor;
}
/**
@@ -207,7 +212,8 @@ public class GridSqlQuerySplitter {
qry.explain(false);
- GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, h2);
+ GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, distributedJoins,
+ h2.partitionExtractor());
// Normalization will generate unique aliases for all the table filters in FROM.
// Also it will collect all tables and schemas from the query.
@@ -262,7 +268,7 @@ public class GridSqlQuerySplitter {
twoStepQry.distributedJoins(distributedJoins);
// all map queries must have non-empty derivedPartitions to use this feature.
- twoStepQry.derivedPartitions(splitter.extractor.merge(twoStepQry.mapQueries()));
+ twoStepQry.derivedPartitions(splitter.extractor.mergeMapQueries(twoStepQry.mapQueries()));
twoStepQry.forUpdate(forUpdate);
@@ -1549,7 +1555,7 @@ public class GridSqlQuerySplitter {
map.partitioned(hasPartitionedTables(mapQry));
map.hasSubQueries(hasSubQueries);
- if (map.isPartitioned())
+ if (map.isPartitioned() && !distributedJoins)
map.derivedPartitions(extractor.extract(mapQry));
mapSqlQrys.add(map);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 62953ec..15788f2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -393,9 +393,12 @@ public class GridReduceQueryExecutor {
int timeoutMillis,
GridQueryCancel cancel,
Object[] params,
- final int[] parts,
+ int[] parts,
boolean lazy,
MvccQueryTracker mvccTracker) {
+ if (qry.isLocal() && parts != null)
+ parts = null;
+
assert !qry.mvccEnabled() || mvccTracker != null;
if (F.isEmpty(params))
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
index fbdbfb0..f6e73bc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -41,7 +40,6 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -481,22 +479,6 @@ public class BetweenOperationExtractPartitionSelfTest extends GridCommonAbstract
}
/**
- * Check custom partitions limit exceeding.
- */
- @Test
- public void testBetweenPartitionsCustomLimitExceeding() {
- try (GridTestUtils.SystemProperty ignored = new GridTestUtils.
- SystemProperty(IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN, "4")){
-
- // Default limit (16) not exceeded.
- testBetweenConstOperator(BETWEEN_QRY, 1, 4, 4);
-
- // Default limit (16) exceeded.
- testBetweenConstOperator(BETWEEN_QRY, 1, 5, 5, EMPTY_PARTITIONS_ARRAY);
- }
- }
-
- /**
* Check range expression with constant values.
*/
@Test
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
new file mode 100644
index 0000000..1429f3f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * Tests for join partition pruning.
+ */
+@SuppressWarnings("deprecation")
+@RunWith(JUnit4.class)
+public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
+ /** Number of intercepted requests. */
+ private static final AtomicInteger INTERCEPTED_REQS = new AtomicInteger();
+
+ /** Parititions tracked during query execution. */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS = new ConcurrentSkipListSet<>();
+
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder().setShared(true);
+
+ /** Client node name. */
+ private static final String CLI_NAME = "cli";
+
+ /** Memory. */
+ private static final String REGION_MEM = "mem";
+
+ /** Disk. */
+ private static final String REGION_DISK = "disk";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ cleanPersistenceDir();
+
+ startGrid(getConfiguration("srv1"));
+ startGrid(getConfiguration("srv2"));
+ startGrid(getConfiguration("srv3"));
+
+ startGrid(getConfiguration(CLI_NAME).setClientMode(true));
+
+ client().cluster().active(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ clearIoState();
+
+ Ignite cli = client();
+
+ cli.destroyCaches(cli.cacheNames());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration res = super.getConfiguration(name);
+
+ res.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+ res.setCommunicationSpi(new TrackingTcpCommunicationSpi());
+
+ res.setLocalHost("127.0.0.1");
+
+ DataRegionConfiguration memRegion =
+ new DataRegionConfiguration().setName(REGION_MEM).setPersistenceEnabled(false);
+
+ DataRegionConfiguration diskRegion =
+ new DataRegionConfiguration().setName(REGION_DISK).setPersistenceEnabled(true);
+
+ res.setDataStorageConfiguration(new DataStorageConfiguration().setDataRegionConfigurations(diskRegion)
+ .setDefaultDataRegionConfiguration(memRegion));
+
+ return res;
+ }
+
+ /**
+ * Test simple join.
+ */
+ @Test
+ public void testSimpleJoin() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ executeSingle("INSERT INTO t1 VALUES ('1', '1')");
+ executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+
+ executeSingle("INSERT INTO t1 VALUES ('2', '2')");
+ executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
+
+ executeSingle("INSERT INTO t1 VALUES ('3', '3')");
+ executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+
+ executeSingle("INSERT INTO t1 VALUES ('4', '4')");
+ executeSingle("INSERT INTO t2 VALUES ('4', '4', '4')");
+
+ executeSingle("INSERT INTO t1 VALUES ('5', '5')");
+ executeSingle("INSERT INTO t2 VALUES ('5', '5', '5')");
+
+ // Key (not alias).
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t1", "1")
+ );
+ assertEquals(1, res.size());
+ assertEquals("1", res.get(0).get(0));
+ },
+ "1"
+ );
+
+ // Key (alias).
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1._KEY = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t1", "2")
+ );
+ assertEquals(1, res.size());
+ assertEquals("2", res.get(0).get(0));
+ },
+ "2"
+ );
+
+ // Non-affinity key.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.k1 = ?",
+ (res) -> {
+ assertNoPartitions();
+ assertEquals(1, res.size());
+ assertEquals("3", res.get(0).get(0));
+ },
+ "3"
+ );
+
+ // Affinity key.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t2", "4")
+ );
+ assertEquals(1, res.size());
+ assertEquals("4", res.get(0).get(0));
+ },
+ "4"
+ );
+
+ // Complex key.
+ BinaryObject key = client().binary().builder("t2_key").setField("k1", "5").setField("ak2", "5").build();
+
+ List<List<?>> res = executeSingle("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2._KEY = ?", key);
+ assertPartitions(
+ partition("t2", "5")
+ );
+ assertEquals(1, res.size());
+ assertEquals("5", res.get(0).get(0));
+ }
+
+ /**
+ * Test how partition ownership is transferred in various cases.
+ */
+ @Test
+ public void testPartitionTransfer() {
+ // First co-located table.
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2"
+ );
+
+ // Second co-located table.
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3"
+ );
+
+ // Third co-located table.
+ createPartitionedTable("t3",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3",
+ "v4"
+ );
+
+ // Transfer through "AND".
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertNoRequests(),
+ "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? AND t2.ak2 IN (?, ?)",
+ (res) -> assertNoRequests(),
+ "1", "2", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) AND t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "2")
+ ),
+ "1", "2", "2", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) AND t2.ak2 IN (?, ?)",
+ (res) -> assertNoRequests(),
+ "1", "2", "3", "4"
+ );
+
+ // Transfer through "OR".
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t2", "2")
+ ),
+ "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t2", "2")
+ ),
+ "1", "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t2", "2"),
+ partition("t2", "3")
+ ),
+ "1", "2", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) OR t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t1", "2"),
+ partition("t2", "3")
+ ),
+ "1", "2", "2", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 IN (?, ?) OR t2.ak2 IN (?, ?)",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t1", "2"),
+ partition("t2", "3"),
+ partition("t2", "4")
+ ),
+ "1", "2", "3", "4"
+ );
+
+ // Multi-way co-located JOIN.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN t3 ON t1.k1 = t3.ak2 " +
+ "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN t3 ON t1.k1 = t3.ak2 " +
+ "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?",
+ (res) -> assertNoRequests(),
+ "1", "2", "3"
+ );
+
+ // No transfer through intermediate table.
+ execute("SELECT * FROM t1 INNER JOIN t3 ON t1.k1 = t3.v3 INNER JOIN t2 ON t3.v4 = t2.ak2 " +
+ "WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "1"
+ );
+
+ // No transfer through disjunction.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ? OR t1.k1 = t2.ak2",
+ (res) -> assertNoPartitions(),
+ "1"
+ );
+ }
+
+ /**
+ * Test cross-joins. They cannot "transfer" partitions between joined tables.
+ */
+ @Test
+ public void testCrossJoin() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ executeSingle("INSERT INTO t1 VALUES ('1', '1')");
+ executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+
+ executeSingle("INSERT INTO t1 VALUES ('2', '2')");
+ executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
+
+ executeSingle("INSERT INTO t1 VALUES ('3', '3')");
+ executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+
+ // Left table, should work.
+ execute("SELECT * FROM t1, t2 WHERE t1.k1 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t1", "1")
+ );
+ assertEquals(1, res.size());
+ assertEquals("1", res.get(0).get(0));
+ },
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t1", "1")
+ );
+ assertEquals(1, res.size());
+ assertEquals("1", res.get(0).get(0));
+ },
+ "1"
+ );
+
+ // Right table, should work.
+ execute("SELECT * FROM t1, t2 WHERE t2.ak2 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t2", "2")
+ );
+ assertEquals(1, res.size());
+ assertEquals("2", res.get(0).get(0));
+ },
+ "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t2.ak2 = ?",
+ (res) -> {
+ assertPartitions(
+ partition("t2", "2")
+ );
+ assertEquals(1, res.size());
+ assertEquals("2", res.get(0).get(0));
+ },
+ "2"
+ );
+
+ execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "3", "3"
+ );
+
+ // Two tables, should not work.
+ execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "3", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1=? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "3", "3"
+ );
+ }
+
+ /**
+ * Test non-equijoins.
+ */
+ @Test
+ public void testThetaJoin() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ // Greater than.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "2"
+ );
+
+ // Less than.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "2"
+ );
+
+ // Non-equal.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ? AND t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "2"
+ );
+ }
+
+ /**
+ * Test joins with REPLICATED cache.
+ */
+ @Test
+ public void testJoinWithReplicated() {
+ // First co-located table.
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2"
+ );
+
+ // Replicated table.
+ createReplicatedTable("t2",
+ pkColumn("k1"),
+ "v2",
+ "v3"
+ );
+
+ // Only partition from PARTITIONED cache should be used.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = ? AND t2.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 IN (?, ?) AND t2.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t1", "2")
+ ),
+ "1", "2", "3"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = ? OR t2.k1 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "2"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t2.k1 = ?",
+ (res) -> assertNoPartitions(),
+ "1"
+ );
+ }
+
+ /**
+ * Test joins with different affinity functions.
+ */
+ @Test
+ public void testJoinWithDifferentAffinityFunctions() {
+ // Partition count.
+ checkAffinityFunctions(
+ cacheConfiguration(256, 1, false, false, false),
+ cacheConfiguration(256, 1, false, false, false),
+ true
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(1024, 1, false, false, false),
+ cacheConfiguration(256, 1, false, false, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 1, false, false, false),
+ cacheConfiguration(1024, 1, false, false, false),
+ false
+ );
+
+ // Backups.
+ checkAffinityFunctions(
+ cacheConfiguration(256, 1, false, false, false),
+ cacheConfiguration(256, 2, false, false, false),
+ true
+ );
+
+ // Different affinity functions.
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, true, false, false),
+ cacheConfiguration(256, 2, false, false, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, false, false),
+ cacheConfiguration(256, 2, true, false, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, true, false, false),
+ cacheConfiguration(256, 2, true, false, false),
+ false
+ );
+
+ // Node filters.
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, true, false),
+ cacheConfiguration(256, 2, false, false, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, false, false),
+ cacheConfiguration(256, 2, false, true, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, true, false),
+ cacheConfiguration(256, 2, false, true, false),
+ false
+ );
+
+ // With and without persistence.
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, false, true),
+ cacheConfiguration(256, 2, false, false, false),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, false, false),
+ cacheConfiguration(256, 2, false, false, true),
+ false
+ );
+
+ checkAffinityFunctions(
+ cacheConfiguration(256, 2, false, false, true),
+ cacheConfiguration(256, 2, false, false, true),
+ true
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkAffinityFunctions(CacheConfiguration ccfg1, CacheConfiguration ccfg2, boolean compatible) {
+ // Destroy old caches.
+ Ignite cli = client();
+
+ cli.destroyCaches(cli.cacheNames());
+
+ // Start new caches.
+ ccfg1.setName("t1");
+ ccfg2.setName("t2");
+
+ QueryEntity entity1 = new QueryEntity(KeyClass1.class, ValueClass.class).setTableName("t1");
+ QueryEntity entity2 = new QueryEntity(KeyClass2.class, ValueClass.class).setTableName("t2");
+
+ ccfg1.setQueryEntities(Collections.singletonList(entity1));
+ ccfg2.setQueryEntities(Collections.singletonList(entity2));
+
+ ccfg1.setKeyConfiguration(new CacheKeyConfiguration(entity1.getKeyType(), "k1"));
+ ccfg2.setKeyConfiguration(new CacheKeyConfiguration(entity2.getKeyType(), "ak2"));
+
+ ccfg1.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+ ccfg2.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+ client().createCache(ccfg1);
+ client().createCache(ccfg2);
+
+ // Conduct tests.
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t2", "2")
+ ),
+ "2"
+ );
+
+ if (compatible) {
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t2", "2")
+ ),
+ "1", "2"
+ );
+ }
+ else {
+ execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ? OR t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1", "2"
+ );
+ }
+ }
+
+ /**
+ * Create custom cache configuration.
+ *
+ * @param parts Partitions.
+ * @param backups Backups.
+ * @param customAffinity Custom affinity function flag.
+ * @param nodeFilter Whether to set node filter.
+ * @param persistent Whether to enable persistence.
+ * @return Cache configuration.
+ */
+ private static CacheConfiguration cacheConfiguration(
+ int parts,
+ int backups,
+ boolean customAffinity,
+ boolean nodeFilter,
+ boolean persistent
+ ) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setBackups(backups);
+
+ RendezvousAffinityFunction affFunc;
+
+ if (customAffinity)
+ affFunc = new CustomRendezvousAffinityFunction();
+ else
+ affFunc = new RendezvousAffinityFunction();
+
+ affFunc.setPartitions(parts);
+
+ ccfg.setAffinity(affFunc);
+
+ if (nodeFilter)
+ ccfg.setNodeFilter(new CustomNodeFilter());
+
+ if (persistent)
+ ccfg.setDataRegionName(REGION_DISK);
+
+ return ccfg;
+ }
+
+ /**
+ * Test joins with subqueries.
+ */
+ @Test
+ public void testJoinWithSubquery() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON t1.k1 = T2_SUB.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON t1.k1 = T2_SUB.ak2 WHERE T2_SUB.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1"
+ );
+ }
+
+ /**
+ * Test joins when explicit partitions are set.
+ */
+ @Test
+ public void testExplicitPartitions() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ executeSqlFieldsQuery(new SqlFieldsQuery("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 " +
+ "WHERE t1.k1=? OR t2.ak2=?").setArgs("1", "2").setPartitions(1));
+
+ assertPartitions(1);
+ }
+
+ /**
+ * Test outer joins.
+ */
+ @Test
+ public void testOuterJoin() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ createPartitionedTable("t2",
+ pkColumn("k1"),
+ affinityColumn("ak2"),
+ "v3");
+
+ execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 = ?",
+ (res) -> assertNoPartitions(),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 INNER JOIN t2 T2_2 ON T2_1.k1 = T2_2.k1 " +
+ "WHERE T2_2.ak2 = ?",
+ (res) -> assertPartitions(
+ partition("t2", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 INNER JOIN t2 T2_2 ON t1.k1 = T2_2.ak2 " +
+ "WHERE T2_1.ak2 = ? AND T2_2.ak2=?",
+ (res) -> assertPartitions(
+ partition("t2", "2")
+ ),
+ "1", "2"
+ );
+ }
+
+ /**
+ * Test JOINs on a single table.
+ */
+ @Test
+ public void testSelfJoin() {
+ createPartitionedTable("t1",
+ pkColumn("k1"),
+ "v2");
+
+ execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1"
+ );
+
+ execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? AND B.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1")
+ ),
+ "1", "1"
+ );
+
+ execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? AND B.k1 = ?",
+ (res) -> assertNoRequests(),
+ "1", "2"
+ );
+
+ execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 = ? OR B.k1 = ?",
+ (res) -> assertPartitions(
+ partition("t1", "1"),
+ partition("t1", "2")
+ ),
+ "1", "2"
+ );
+ }
+
+ /**
+ * Create PARTITIONED table.
+ *
+ * @param name Name.
+ * @param cols Columns.
+ */
+ private void createPartitionedTable(String name, Object... cols) {
+ createTable0(name, false, cols);
+ }
+
+ /**
+ * Create REPLICATED table.
+ *
+ * @param name Name.
+ * @param cols Columns.
+ */
+ @SuppressWarnings("SameParameterValue")
+ private void createReplicatedTable(String name, Object... cols) {
+ createTable0(name, true, cols);
+ }
+
+ /**
+ * Internal CREATE TABLE routine.
+ *
+ * @param name Name.
+ * @param replicated Replicated table flag.
+ * @param cols Columns.
+ */
+ @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+ private void createTable0(String name, boolean replicated, Object... cols) {
+ List<String> pkCols = new ArrayList<>();
+
+ String affCol = null;
+
+ StringBuilder sql = new StringBuilder("CREATE TABLE ").append(name).append("(");
+ for (Object col : cols) {
+ Column col0 = col instanceof Column ? (Column)col : new Column((String)col, false, false);
+
+ sql.append(col0.name()).append(" VARCHAR, ");
+
+ if (col0.pk())
+ pkCols.add(col0.name());
+
+ if (col0.affinity()) {
+ if (affCol != null)
+ throw new IllegalStateException("Only one affinity column is allowed: " + col0.name());
+
+ affCol = col0.name();
+ }
+ }
+
+ if (pkCols.isEmpty())
+ throw new IllegalStateException("No PKs!");
+
+ sql.append("PRIMARY KEY (");
+
+ boolean firstPkCol = true;
+
+ for (String pkCol : pkCols) {
+ if (firstPkCol)
+ firstPkCol = false;
+ else
+ sql.append(", ");
+
+ sql.append(pkCol);
+ }
+
+ sql.append(")");
+
+ sql.append(") WITH \"template=" + (replicated ? "replicated" : "partitioned"));
+ sql.append(", CACHE_NAME=" + name);
+
+ if (affCol != null) {
+ sql.append(", AFFINITY_KEY=" + affCol);
+ sql.append(", KEY_TYPE=" + name + "_key");
+ }
+
+ sql.append("\"");
+
+ executeSingle(sql.toString());
+ }
+
+ /**
+ * Execute query with all possible combinations of argument placeholders.
+ *
+ * @param sql SQL.
+ * @param resConsumer Result consumer.
+ * @param args Arguments.
+ */
+ public void execute(String sql, Consumer<List<List<?>>> resConsumer, Object... args) {
+ System.out.println(">>> TEST COMBINATION: " + sql);
+
+ // Execute query as is.
+ List<List<?>> res = executeSingle(sql, args);
+
+ resConsumer.accept(res);
+
+ // Start filling arguments recursively.
+ if (args != null && args.length > 0)
+ executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+
+ System.out.println();
+ }
+
+ /**
+ * Execute query with all possible combinations of argument placeholders.
+ *
+ * @param sql SQL.
+ * @param resConsumer Result consumer.
+ * @param executedSqls Already executed SQLs.
+ * @param args Arguments.
+ */
+ public void executeCombinations0(
+ String sql,
+ Consumer<List<List<?>>> resConsumer,
+ Set<String> executedSqls,
+ Object... args
+ ) {
+ assert args != null && args.length > 0;
+
+ // Get argument positions.
+ List<Integer> paramPoss = new ArrayList<>();
+
+ int pos = 0;
+
+ while (true) {
+ int paramPos = sql.indexOf('?', pos);
+
+ if (paramPos == -1)
+ break;
+
+ paramPoss.add(paramPos);
+
+ pos = paramPos + 1;
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ // Prepare new SQL and arguments.
+ int paramPos = paramPoss.get(i);
+
+ String newSql = sql.substring(0, paramPos) + args[i] + sql.substring(paramPos + 1);
+
+ Object[] newArgs = new Object[args.length - 1];
+
+ int newArgsPos = 0;
+
+ for (int j = 0; j < args.length; j++) {
+ if (j != i)
+ newArgs[newArgsPos++] = args[j];
+ }
+
+ // Execute if this combination was never executed before.
+ if (executedSqls.add(newSql)) {
+ List<List<?>> res = executeSingle(newSql, newArgs);
+
+ resConsumer.accept(res);
+ }
+
+ // Continue recursively.
+ if (newArgs.length > 0)
+ executeCombinations0(newSql, resConsumer, executedSqls, newArgs);
+ }
+ }
+
+ /**
+ * Execute SQL query.
+ *
+ * @param sql SQL.
+ */
+ private List<List<?>> executeSingle(String sql, Object... args) {
+ clearIoState();
+
+ if (args == null || args.length == 0)
+ System.out.println(">>> " + sql);
+ else
+ System.out.println(">>> " + sql + " " + Arrays.toString(args));
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ if (args != null && args.length > 0)
+ qry.setArgs(args);
+
+ return executeSqlFieldsQuery(qry);
+ }
+
+ /**
+ * Execute prepared SQL fields query.
+ *
+ * @param qry Query.
+ * @return Result.
+ */
+ private List<List<?>> executeSqlFieldsQuery(SqlFieldsQuery qry) {
+ return client().context().query().querySqlFields(qry, false).getAll();
+ }
+
+ /**
+ * @return Client node.
+ */
+ private IgniteEx client() {
+ return grid(CLI_NAME);
+ }
+
+ /**
+ * Clear partitions.
+ */
+ private static void clearIoState() {
+ INTERCEPTED_REQS.set(0);
+ INTERCEPTED_PARTS.clear();
+ }
+
+ /**
+ * Make sure that expected partitions are logged.
+ *
+ * @param expParts Expected partitions.
+ */
+ private static void assertPartitions(int... expParts) {
+ Collection<Integer> expParts0 = new TreeSet<>();
+
+ for (int expPart : expParts)
+ expParts0.add(expPart);
+
+ assertPartitions(expParts0);
+ }
+
+ /**
+ * Make sure that expected partitions are logged.
+ *
+ * @param expParts Expected partitions.
+ */
+ private static void assertPartitions(Collection<Integer> expParts) {
+ TreeSet<Integer> expParts0 = new TreeSet<>(expParts);
+ TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+ assertEquals("Unexpected partitions [exp=" + expParts + ", actual=" + actualParts + ']',
+ expParts0, actualParts);
+ }
+
+ /**
+ * Make sure that no partitions were extracted.
+ */
+ private static void assertNoPartitions() {
+ assertTrue("No requests were sent.", INTERCEPTED_REQS.get() > 0);
+ assertTrue("Partitions are not empty: " + INTERCEPTED_PARTS, INTERCEPTED_PARTS.isEmpty());
+ }
+
+ /**
+ * Make sure there were no requests sent because we determined empty partition set.
+ */
+ private static void assertNoRequests() {
+ assertEquals("Requests were sent: " + INTERCEPTED_REQS.get(), 0, INTERCEPTED_REQS.get());
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ * @return Partition.
+ */
+ private int partition(String cacheName, Object key) {
+ return client().affinity(cacheName).partition(key);
+ }
+
+ /**
+ * TCP communication SPI which will track outgoing query requests.
+ */
+ private static class TrackingTcpCommunicationSpi extends TcpCommunicationSpi {
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof GridH2QueryRequest) {
+ INTERCEPTED_REQS.incrementAndGet();
+
+ GridH2QueryRequest req = (GridH2QueryRequest)msg0.message();
+
+ int[] parts = req.queryPartitions();
+
+ if (!F.isEmpty(parts)) {
+ for (int part : parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ * @param name Name.
+ * @return PK column.
+ */
+ public Column pkColumn(String name) {
+ return new Column(name, true, false);
+ }
+
+ /**
+ * @param name Name.
+ * @return Affintiy column.
+ */
+ public Column affinityColumn(String name) {
+ return new Column(name, true, true);
+ }
+
+ /**
+ * Column.
+ */
+ private static class Column {
+ /** Name. */
+ private final String name;
+
+ /** PK. */
+ private final boolean pk;
+
+ /** Affinity key. */
+ private final boolean aff;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param pk PK flag.
+ * @param aff Affinity flag.
+ */
+ public Column(String name, boolean pk, boolean aff) {
+ this.name = name;
+ this.pk = pk;
+ this.aff = aff;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * @return PK flag.
+ */
+ public boolean pk() {
+ return pk;
+ }
+
+ /**
+ * @return Affintiy flag.
+ */
+ public boolean affinity() {
+ return aff;
+ }
+ }
+
+ /**
+ * Custom affinity function.
+ */
+ private static class CustomRendezvousAffinityFunction extends RendezvousAffinityFunction {
+ // No-op.
+ }
+
+ /**
+ * Custom node filter.
+ */
+ private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
+ @Override public boolean apply(ClusterNode clusterNode) {
+ return true;
+ }
+ }
+
+ /**
+ * Key class 1.
+ */
+ @SuppressWarnings("unused")
+ private static class KeyClass1 {
+ /** Key. */
+ @QuerySqlField
+ private String k1;
+ }
+
+ /**
+ * Key class 2.
+ */
+ @SuppressWarnings("unused")
+ private static class KeyClass2 {
+ /** Key. */
+ @QuerySqlField
+ private String k1;
+
+ /** Affinity key. */
+ @QuerySqlField
+ private String ak2;
+ }
+
+ /**
+ * Value class.
+ */
+ @SuppressWarnings("unused")
+ private static class ValueClass {
+ /** Value. */
+ @QuerySqlField
+ private String v;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index ce8f6cc..5ffd7fa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -215,6 +215,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
import org.apache.ignite.internal.processors.query.h2.twostep.AndOperationExtractPartitionSelfTest;
import org.apache.ignite.internal.processors.query.h2.twostep.BetweenOperationExtractPartitionSelfTest;
import org.apache.ignite.internal.processors.query.h2.twostep.InOperationExtractPartitionSelfTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.JoinPartitionPruningSelfTest;
import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedAtomicColumnConstraintsTest;
import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalColumnConstraintsTest;
import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalSnapshotColumnConstraintTest;
@@ -527,6 +528,7 @@ import org.junit.runners.Suite;
InOperationExtractPartitionSelfTest.class,
AndOperationExtractPartitionSelfTest.class,
BetweenOperationExtractPartitionSelfTest.class,
+ JoinPartitionPruningSelfTest.class,
GridCacheDynamicLoadOnClientTest.class,
GridCacheDynamicLoadOnClientPersistentTest.class,