You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/04/26 13:25:34 UTC
[23/50] [abbrv] ignite git commit: IGNITE-4523 Allow distributed SQL
query execution over explicit set of partitions - Fixes #1858.
IGNITE-4523 Allow distributed SQL query execution over explicit set of partitions - Fixes #1858.
Signed-off-by: Sergi Vladykin <se...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ef610c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ef610c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ef610c0
Branch: refs/heads/master
Commit: 5ef610c07c947c7cf4884b946ef1649e5ce4da34
Parents: 712398e
Author: ascherbakoff <al...@gmail.com>
Authored: Tue Apr 25 14:01:33 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Tue Apr 25 14:01:33 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/query/Query.java | 48 ++
.../ignite/cache/query/SqlFieldsQuery.java | 26 +
.../org/apache/ignite/cache/query/SqlQuery.java | 26 +
.../processors/cache/IgniteCacheProxy.java | 14 +
.../processors/query/GridQueryProcessor.java | 4 +-
.../ignite/internal/util/GridIntIterator.java | 33 +
.../ignite/internal/util/GridIntList.java | 21 +-
.../ignite/internal/util/IgniteUtils.java | 21 +-
.../processors/query/h2/IgniteH2Indexing.java | 14 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 5 +-
.../h2/twostep/GridReduceQueryExecutor.java | 222 ++++++-
.../h2/twostep/msg/GridH2QueryRequest.java | 64 +-
...stributedPartitionQueryAbstractSelfTest.java | 655 +++++++++++++++++++
...utedPartitionQueryConfigurationSelfTest.java | 92 +++
...butedPartitionQueryNodeRestartsSelfTest.java | 114 ++++
...eCacheDistributedPartitionQuerySelfTest.java | 90 +++
.../IgniteCacheQueryNodeRestartSelfTest2.java | 8 +
.../IgniteCacheQuerySelfTestSuite.java | 6 +
18 files changed, 1419 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
index 71161e7..c9ed464 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java
@@ -18,7 +18,10 @@
package org.apache.ignite.cache.query;
import java.io.Serializable;
+import java.util.Arrays;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -93,6 +96,51 @@ public abstract class Query<R> implements Serializable {
return this;
}
+ /**
+ * Prepares the partitions.
+ *
+ * @param parts Partitions.
+ */
+ protected int[] prepare(int[] parts) {
+ if (parts == null)
+ return null;
+
+ A.notEmpty(parts, "Partitions");
+
+ boolean sorted = true;
+
+ // Try to do validation in one pass, if array is already sorted.
+ for (int i = 0; i < parts.length; i++) {
+ if (i < parts.length - 1)
+ if (parts[i] > parts[i + 1])
+ sorted = false;
+ else if (sorted)
+ validateDups(parts[i], parts[i + 1]);
+
+ A.ensure(0 <= parts[i] && parts[i] < CacheConfiguration.MAX_PARTITIONS_COUNT, "Illegal partition");
+ }
+
+ // Sort and validate again.
+ if (!sorted) {
+ Arrays.sort(parts);
+
+ for (int i = 0; i < parts.length; i++) {
+ if (i < parts.length - 1)
+ validateDups(parts[i], parts[i + 1]);
+ }
+ }
+
+ return parts;
+ }
+
+ /**
+ * @param p1 Part 1.
+ * @param p2 Part 2.
+ */
+ private void validateDups(int p1, int p2) {
+ A.ensure(p1 != p2, "Partition duplicates are not allowed: " + p1);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Query.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 8c3a4fe..9a7211b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
* SQL Fields query. This query can return specific fields of data based
@@ -70,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean replicatedOnly;
+ /** Partitions for query */
+ private int[] parts;
+
/**
* Constructs SQL fields query.
*
@@ -261,6 +265,28 @@ public class SqlFieldsQuery extends Query<List<?>> {
return replicatedOnly;
}
+ /**
+ * Gets partitions for query, in ascending order.
+ */
+ @Nullable public int[] getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets partitions for a query.
+ * The query will be executed only on nodes which are primary for specified partitions.
+ * <p>
+ * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet.
+ *
+ * @param parts Partitions.
+ * @return {@code this} for chaining.
+ */
+ public SqlFieldsQuery setPartitions(@Nullable int... parts) {
+ this.parts = prepare(parts);
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlFieldsQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index 944c70e..a5994b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
* SQL Query.
@@ -56,6 +57,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private boolean replicatedOnly;
+ /** Partitions for query */
+ private int[] parts;
+
/**
* Constructs query for the given type name and SQL query.
*
@@ -250,6 +254,28 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
return replicatedOnly;
}
+ /**
+ * Gets partitions for query, in ascending order.
+ */
+ @Nullable public int[] getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets partitions for a query.
+ * The query will be executed only on nodes which are primary for specified partitions.
+ * <p>
+ * Note what passed array'll be sorted in place for performance reasons, if it wasn't sorted yet.
+ *
+ * @param parts Partitions.
+ * @return {@code this} for chaining.
+ */
+ public SqlQuery setPartitions(@Nullable int... parts) {
+ this.parts = prepare(parts);
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b38520d..dfe817e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -776,6 +776,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (qry instanceof SqlQuery) {
final SqlQuery p = (SqlQuery)qry;
+ if (p.isReplicatedOnly() && p.getPartitions() != null)
+ throw new CacheException("Partitions are not supported in replicated only mode.");
+
+ if (p.isDistributedJoins() && p.getPartitions() != null)
+ throw new CacheException(
+ "Using both partitions and distributed JOINs is not supported for the same query");
+
if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
opCtxCall != null && opCtxCall.isKeepBinary());
@@ -786,6 +793,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (qry instanceof SqlFieldsQuery) {
SqlFieldsQuery p = (SqlFieldsQuery)qry;
+ if (p.isReplicatedOnly() && p.getPartitions() != null)
+ throw new CacheException("Partitions are not supported in replicated only mode.");
+
+ if (p.isDistributedJoins() && p.getPartitions() != null)
+ throw new CacheException(
+ "Using both partitions and distributed JOINs is not supported for the same query");
+
if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 015646d..448639b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1754,7 +1754,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
qry.getArgs(),
cctx.name());
- return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
+ return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()), keepBinary);
}
}, true);
}
@@ -1938,7 +1938,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
GridQueryCancel cancel = new GridQueryCancel();
final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
- idx.backupFilter(requestTopVer.get(), null), cancel);
+ idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel);
return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
new file mode 100644
index 0000000..ea863e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Iterator over integer primitives.
+ */
+public interface GridIntIterator {
+ /**
+ * @return {@code true} if the iteration has more elements.
+ */
+ public boolean hasNext();
+
+ /**
+ * @return Next int.
+ */
+ public int next();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
index 968b88e..e5b7b1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java
@@ -582,5 +582,22 @@ public class GridIntList implements Message, Externalizable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 2;
- }
-}
+ }
+
+ /**
+ * @return Iterator.
+ */
+ public GridIntIterator iterator() {
+ return new GridIntIterator() {
+ int c = 0;
+
+ @Override public boolean hasNext() {
+ return c < idx;
+ }
+
+ @Override public int next() {
+ return arr[c++];
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7d7d071..59d334a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10094,4 +10094,23 @@ public abstract class IgniteUtils {
throw new IgniteCheckedException(e);
}
}
-}
+
+ /**
+ * Returns {@link GridIntIterator} for range of primitive integers.
+ * @param start Start.
+ * @param cnt Count.
+ */
+ public static GridIntIterator forRange(final int start, final int cnt) {
+ return new GridIntIterator() {
+ int c = 0;
+
+ @Override public boolean hasNext() {
+ return c < cnt;
+ }
+
+ @Override public int next() {
+ return start + c++;
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 798ca9b..361b55b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1471,6 +1471,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param qry Query.
* @param keepCacheObj Flag to keep cache object.
* @param enforceJoinOrder Enforce join order of tables.
+ * @param parts Partitions.
* @return Iterable result.
*/
private Iterable<List<?>> runQueryTwoStep(
@@ -1480,11 +1481,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final boolean enforceJoinOrder,
final int timeoutMillis,
final GridQueryCancel cancel,
- final Object[] params
+ final Object[] params,
+ final int[] parts
) {
return new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params);
+ return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params, parts);
}
};
}
@@ -1515,6 +1517,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fqry.setArgs(qry.getArgs());
fqry.setPageSize(qry.getPageSize());
fqry.setDistributedJoins(qry.isDistributedJoins());
+ fqry.setPartitions(qry.getPartitions());
fqry.setLocal(qry.isLocal());
if (qry.getTimeout() > 0)
@@ -1730,7 +1733,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cancel = new GridQueryCancel();
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()),
+ runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel,
+ qry.getArgs(), qry.getPartitions()),
cancel);
cursor.fieldsMeta(meta);
@@ -1750,12 +1754,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (caches.isEmpty())
return; // Nothing to check
- GridCacheSharedContext sharedContext = ctx.cache().context();
+ GridCacheSharedContext sharedCtx = ctx.cache().context();
int expectedParallelism = 0;
for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+ GridCacheContext cctx = sharedCtx.cacheContext(caches.get(i));
assert cctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e4347b5..45d8f50 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -426,8 +426,11 @@ public class GridMapQueryExecutor {
* @param req Query request.
*/
private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+ int[] qryParts = req.queryPartitions();
+
final Map<UUID,int[]> partsMap = req.partitions();
- final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+ final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
assert !F.isEmpty(req.caches());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index d307c00..3d81cb5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -73,6 +74,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.GridIntIterator;
+import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.F;
@@ -80,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
@@ -113,6 +117,9 @@ public class GridReduceQueryExecutor {
private static final String MERGE_INDEX_SORTED = "merge_sorted";
/** */
+ private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet();
+
+ /** */
private GridKernalContext ctx;
/** */
@@ -376,21 +383,78 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param topVer Topology version.
+ * @param cctx Cache context.
+ * @param parts Partitions.
+ */
+ private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer,
+ final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
+
+ Map<ClusterNode, IntArray> mapping = new HashMap<>();
+
+ // Explicit partitions mapping is not applicable to replicated cache.
+ if (cctx.isReplicated()) {
+ for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).primaryPartitionNodes())
+ mapping.put(clusterNode, null);
+
+ return mapping;
+ }
+
+ List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment();
+
+ boolean needPartsFilter = parts != null;
+
+ GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() :
+ U.forRange(0, cctx.affinity().partitions());
+
+ while(iter.hasNext()) {
+ int partId = iter.next();
+
+ List<ClusterNode> partNodes = assignment.get(partId);
+
+ if (partNodes.size() > 0) {
+ ClusterNode prim = partNodes.get(0);
+
+ if (!needPartsFilter) {
+ mapping.put(prim, null);
+
+ continue;
+ }
+
+ IntArray partIds = mapping.get(prim);
+
+ if (partIds == null) {
+ partIds = new IntArray();
+
+ mapping.put(prim, partIds);
+ }
+
+ partIds.add(partId);
+ }
+ }
+
+ return mapping;
+ }
+
+ /**
* @param isReplicatedOnly If we must only have replicated caches.
* @param topVer Topology version.
* @param cctx Cache context for main space.
* @param extraSpaces Extra spaces.
+ * @param parts Partitions.
* @return Data nodes or {@code null} if repartitioning started and we need to retry.
*/
- private Collection<ClusterNode> stableDataNodes(
- boolean isReplicatedOnly,
- AffinityTopologyVersion topVer,
- final GridCacheContext<?, ?> cctx,
- List<Integer> extraSpaces
- ) {
- Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes());
+ private Map<ClusterNode, IntArray> stableDataNodes(
+ boolean isReplicatedOnly,
+ AffinityTopologyVersion topVer,
+ final GridCacheContext<?, ?> cctx,
+ List<Integer> extraSpaces,
+ int[] parts) {
+ Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
- if (F.isEmpty(nodes))
+ Set<ClusterNode> nodes = map.keySet();
+
+ if (F.isEmpty(map))
throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
if (!F.isEmpty(extraSpaces)) {
@@ -406,7 +470,7 @@ public class GridReduceQueryExecutor {
throw new CacheException("Queries running on replicated cache should not contain JOINs " +
"with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
- Collection<ClusterNode> extraNodes = extraCctx.affinity().assignment(topVer).primaryPartitionNodes();
+ Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
if (F.isEmpty(extraNodes))
throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
@@ -414,7 +478,7 @@ public class GridReduceQueryExecutor {
if (isReplicatedOnly && extraCctx.isReplicated()) {
nodes.retainAll(extraNodes);
- if (nodes.isEmpty()) {
+ if (map.isEmpty()) {
if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
else
@@ -431,7 +495,7 @@ public class GridReduceQueryExecutor {
", cache2=" + extraSpace + "]");
}
else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
- if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
+ if (!extraNodes.equals(nodes))
if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
else
@@ -443,7 +507,7 @@ public class GridReduceQueryExecutor {
}
}
- return nodes;
+ return map;
}
/**
@@ -454,6 +518,7 @@ public class GridReduceQueryExecutor {
* @param timeoutMillis Timeout in milliseconds.
* @param cancel Query cancel.
* @param params Query parameters.
+ * @param parts Partitions.
* @return Rows iterator.
*/
public Iterator<List<?>> query(
@@ -463,13 +528,17 @@ public class GridReduceQueryExecutor {
boolean enforceJoinOrder,
int timeoutMillis,
GridQueryCancel cancel,
- Object[] params
+ Object[] params,
+ final int[] parts
) {
if (F.isEmpty(params))
params = EMPTY_PARAMS;
final boolean isReplicatedOnly = qry.isReplicatedOnly();
+ // Fail if all caches are replicated and explicit partitions are set.
+
+
for (int attempt = 0;; attempt++) {
if (attempt != 0) {
try {
@@ -494,11 +563,30 @@ public class GridReduceQueryExecutor {
List<Integer> extraSpaces = qry.extraCaches();
- Collection<ClusterNode> nodes;
+ Collection<ClusterNode> nodes = null;
// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
+ // Explicit partitions mapping for query.
+ Map<ClusterNode, IntArray> qryMap = null;
+
+ // Partitions are not supported for queries over all replicated caches.
+ if (cctx.isReplicated() && parts != null) {
+ boolean failIfReplicatedOnly = true;
+
+ for (Integer cacheId : extraSpaces) {
+ if (!cacheContext(cacheId).isReplicated()) {
+ failIfReplicatedOnly = false;
+
+ break;
+ }
+ }
+
+ if (failIfReplicatedOnly)
+ throw new CacheException("Partitions are not supported for replicated caches");
+ }
+
if (qry.isLocal())
nodes = singletonList(ctx.discovery().localNode());
else {
@@ -508,11 +596,18 @@ public class GridReduceQueryExecutor {
else {
partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
- nodes = partsMap == null ? null : partsMap.keySet();
+ if (partsMap != null) {
+ qryMap = narrowForQuery(partsMap, parts);
+
+ nodes = qryMap == null ? null : qryMap.keySet();
+ }
}
+ } else {
+ qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts);
+
+ if (qryMap != null)
+ nodes = qryMap.keySet();
}
- else
- nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces);
if (nodes == null)
continue; // Retry.
@@ -633,19 +728,18 @@ public class GridReduceQueryExecutor {
if (send(nodes,
new GridH2QueryRequest()
- .requestId(qryReqId)
- .topologyVersion(topVer)
- .pageSize(r.pageSize)
- .caches(qry.caches())
- .tables(distributedJoins ? qry.tables() : null)
- .partitions(convert(partsMap))
- .queries(mapQrys)
- .parameters(params)
- .flags(flags)
- .timeout(timeoutMillis),
- null,
- false)) {
-
+ .requestId(qryReqId)
+ .topologyVersion(topVer)
+ .pageSize(r.pageSize)
+ .caches(qry.caches())
+ .tables(distributedJoins ? qry.tables() : null)
+ .partitions(convert(partsMap))
+ .queries(mapQrys)
+ .parameters(params)
+ .flags(flags)
+ .timeout(timeoutMillis),
+ parts == null ? null : new ExplicitPartitionsSpecializer(qryMap),
+ false)) {
awaitAllReplies(r, nodes, cancel);
Object state = r.state.get();
@@ -1034,7 +1128,13 @@ public class GridReduceQueryExecutor {
List<ClusterNode> owners = cctx.topology().owners(p);
if (F.isEmpty(owners)) {
- if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
+ // Handle special case: no mapping is configured for a partition.
+ if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) {
+ partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition.
+
+ continue;
+ }
+ else if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
return null; // Retry.
throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]");
@@ -1059,6 +1159,9 @@ public class GridReduceQueryExecutor {
for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = extraCctx.topology().owners(p);
+ if (partLocs[p] == UNMAPPED_PARTS)
+ continue; // Skip unmapped partitions.
+
if (F.isEmpty(owners)) {
if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
return null; // Retry.
@@ -1090,6 +1193,9 @@ public class GridReduceQueryExecutor {
return null; // Retry.
for (Set<ClusterNode> partLoc : partLocs) {
+ if (partLoc == UNMAPPED_PARTS)
+ continue; // Skip unmapped partition.
+
partLoc.retainAll(dataNodes);
if (partLoc.isEmpty())
@@ -1105,6 +1211,10 @@ public class GridReduceQueryExecutor {
for (int p = 0; p < partLocs.length; p++) {
Set<ClusterNode> pl = partLocs[p];
+ // Skip unmapped partitions.
+ if (pl == UNMAPPED_PARTS)
+ continue;
+
assert !F.isEmpty(pl) : pl;
ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
@@ -1429,4 +1539,52 @@ public class GridReduceQueryExecutor {
state(e, null);
}
}
-}
+
+ /** */
+ private Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) {
+ if (parts == null)
+ return partsMap;
+
+ Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size());
+
+ for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) {
+ IntArray filtered = new IntArray(parts.length);
+
+ IntArray orig = entry.getValue();
+
+ for (int i = 0; i < orig.size(); i++) {
+ int p = orig.get(i);
+
+ if (Arrays.binarySearch(parts, p) >= 0)
+ filtered.add(p);
+ }
+
+ if (filtered.size() > 0)
+ cp.put(entry.getKey(), filtered);
+ }
+
+ return cp.isEmpty() ? null : cp;
+ }
+
+ /** */
+ private static class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode, Message, Message> {
+ /** Partitions map. */
+ private final Map<ClusterNode, IntArray> partsMap;
+
+ /**
+ * @param partsMap Partitions map.
+ */
+ public ExplicitPartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) {
+ this.partsMap = partsMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Message apply(ClusterNode node, Message msg) {
+ GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+
+ rq.queryPartitions(toArray(partsMap.get(node)));
+
+ return rq;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 9e7dcbf..6741d89 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
@@ -92,6 +93,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
@GridDirectMap(keyType = UUID.class, valueType = int[].class)
private Map<UUID, int[]> parts;
+ /** Query partitions. */
+ @GridToStringInclude
+ private int[] qryParts;
+
/** */
private int pageSize;
@@ -120,6 +125,32 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
private byte[] paramsBytes;
/**
+ * Required by {@link Externalizable}
+ */
+ public GridH2QueryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param req Request.
+ * @return {@code this}.
+ */
+ public GridH2QueryRequest(GridH2QueryRequest req) {
+ this.reqId = req.reqId;
+ this.caches = req.caches;
+ this.topVer = req.topVer;
+ this.parts = req.parts;
+ this.qryParts = req.qryParts;
+ this.pageSize = req.pageSize;
+ this.qrys = req.qrys;
+ this.flags = req.flags;
+ this.tbls = req.tbls;
+ this.timeout = req.timeout;
+ this.params = req.params;
+ this.paramsBytes = req.paramsBytes;
+ }
+
+ /**
* @return Parameters.
*/
public Object[] parameters() {
@@ -225,6 +256,23 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
}
/**
+ * @return Query partitions.
+ */
+ public int[] queryPartitions() {
+ return qryParts;
+ }
+
+ /**
+ * @param qryParts Query partitions.
+ * @return {@code this}.
+ */
+ public GridH2QueryRequest queryPartitions(int[] qryParts) {
+ this.qryParts = qryParts;
+
+ return this;
+ }
+
+ /**
* @param pageSize Page size.
* @return {@code this}.
*/
@@ -403,6 +451,12 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
writer.incrementState();
+
+ case 10:
+ if (!writer.writeIntArray("qryParts", qryParts))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -496,6 +550,14 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
+
+ case 10:
+ qryParts = reader.readIntArray("qryParts");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -508,7 +570,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 11;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
new file mode 100644
index 0000000..708fb1d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryAbstractSelfTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.util.AttributeNodeFilter;
+import org.jsr166.ThreadLocalRandom8;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Abstract test for queries over explicit partitions.
+ */
+public abstract class IgniteCacheDistributedPartitionQueryAbstractSelfTest extends GridCommonAbstractTest {
+ /** Join query for test. */
+ private static final String JOIN_QRY = "select cl._KEY, de.depositId, de.regionId from " +
+ "\"cl\".Client cl, \"de\".Deposit de, \"re\".Region re where cl.clientId=de.clientId and de.regionId=re._KEY";
+
+ /** Region node attribute name. */
+ private static final String REGION_ATTR_NAME = "reg";
+
+ /** Grids count. */
+ protected static final int GRIDS_COUNT = 10;
+
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Partitions per region distribution. */
+ protected static final int[] PARTS_PER_REGION = new int[] {10, 20, 30, 40, 24};
+
+ /** Unmapped region id. */
+ protected static final int UNMAPPED_REGION = PARTS_PER_REGION.length;
+
+ /** Clients per partition. */
+ protected static final int CLIENTS_PER_PARTITION = 1;
+
+ /** Total clients. */
+ private static final int TOTAL_CLIENTS;
+
+ /** Affinity function to use on partitioned caches. */
+ private static final AffinityFunction AFFINITY = new RegionAwareAffinityFunction();
+
+ /** Partitions count. */
+ private static final int PARTS_COUNT;
+
+ /** Regions to partitions mapping. */
+ protected static final NavigableMap<Integer, List<Integer>> REGION_TO_PART_MAP = new TreeMap<>();
+
+ /** Query threads count. */
+ protected static final int QUERY_THREADS_CNT = 4;
+
+ /** Restarting threads count. */
+ protected static final int RESTART_THREADS_CNT = 2;
+
+ /** Node stop time. */
+ protected static final int NODE_RESTART_TIME = 1_000;
+
+ static {
+ int total = 0, parts = 0, p = 0, regionId = 1;
+
+ for (int regCnt : PARTS_PER_REGION) {
+ total += regCnt * CLIENTS_PER_PARTITION;
+
+ parts += regCnt;
+
+ REGION_TO_PART_MAP.put(regionId++, Arrays.asList(p, regCnt));
+
+ p += regCnt;
+ }
+
+ /** Last region was left empty intentionally, see {@link #UNMAPPED_REGION} */
+ TOTAL_CLIENTS = total - PARTS_PER_REGION[PARTS_PER_REGION.length - 1] * CLIENTS_PER_PARTITION;
+
+ PARTS_COUNT = parts;
+ }
+
+ /** Deposits per client. */
+ public static final int DEPOSITS_PER_CLIENT = 10;
+
+ /** Rnd. */
+ protected GridRandom rnd = new GridRandom();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setDefaultMemoryPolicyName("default");
+ memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(20 * 1024 * 1024));
+
+ cfg.setMemoryConfiguration(memCfg);
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ spi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(spi);
+
+ /** Clients cache */
+ CacheConfiguration<ClientKey, Client> clientCfg = new CacheConfiguration<>();
+ clientCfg.setName("cl");
+ clientCfg.setWriteSynchronizationMode(FULL_SYNC);
+ clientCfg.setAtomicityMode(TRANSACTIONAL);
+ clientCfg.setRebalanceMode(SYNC);
+ clientCfg.setBackups(2);
+ clientCfg.setAffinity(AFFINITY);
+ clientCfg.setIndexedTypes(ClientKey.class, Client.class);
+
+ /** Deposits cache */
+ CacheConfiguration<DepositKey, Deposit> depoCfg = new CacheConfiguration<>();
+ depoCfg.setName("de");
+ depoCfg.setWriteSynchronizationMode(FULL_SYNC);
+ depoCfg.setAtomicityMode(TRANSACTIONAL);
+ depoCfg.setRebalanceMode(SYNC);
+ depoCfg.setBackups(2);
+ depoCfg.setAffinity(AFFINITY);
+ depoCfg.setIndexedTypes(DepositKey.class, Deposit.class);
+
+ /** Regions cache. Uses default affinity. */
+ CacheConfiguration<Integer, Region> regionCfg = new CacheConfiguration<>();
+ regionCfg.setName("re");
+ regionCfg.setWriteSynchronizationMode(FULL_SYNC);
+ regionCfg.setAtomicityMode(TRANSACTIONAL);
+ regionCfg.setRebalanceMode(SYNC);
+ regionCfg.setCacheMode(CacheMode.REPLICATED);
+ regionCfg.setIndexedTypes(Integer.class, Region.class);
+
+ cfg.setCacheConfiguration(clientCfg, depoCfg, regionCfg);
+
+ if ("client".equals(gridName))
+ cfg.setClientMode(true);
+ else {
+ Integer reg = regionForGrid(gridName);
+
+ cfg.setUserAttributes(F.asMap(REGION_ATTR_NAME, reg));
+
+ log().info("Assigned region " + reg + " to grid " + gridName);
+ }
+
+ return cfg;
+ }
+
+ /** */
+ private static final class RegionAwareAffinityFunction implements AffinityFunction {
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return PARTS_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ Integer regionId;
+
+ if (key instanceof RegionKey)
+ regionId = ((RegionKey)key).regionId;
+ else if (key instanceof BinaryObject) {
+ BinaryObject bo = (BinaryObject)key;
+
+ regionId = bo.field("regionId");
+ }
+ else
+ throw new IgniteException("Unsupported key for region aware affinity");
+
+ List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+ Integer cnt = range.get(1);
+
+ return U.safeAbs(key.hashCode() % cnt) + range.get(0); // Assign partition in region's range.
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ List<List<ClusterNode>> assignment = new ArrayList<>(PARTS_COUNT);
+
+ for (int p = 0; p < PARTS_COUNT; p++) {
+ // Get region for partition.
+ int regionId = regionForPart(p);
+
+ // Filter all nodes for region.
+ AttributeNodeFilter f = new AttributeNodeFilter(REGION_ATTR_NAME, regionId);
+
+ List<ClusterNode> regionNodes = new ArrayList<>();
+
+ for (ClusterNode node : nodes)
+ if (f.apply(node))
+ regionNodes.add(node);
+
+ final int cp = p;
+
+ Collections.sort(regionNodes, new Comparator<ClusterNode>() {
+ @Override public int compare(ClusterNode o1, ClusterNode o2) {
+ return Long.compare(hash(cp, o1), hash(cp, o2));
+ }
+ });
+
+ assignment.add(regionNodes);
+ }
+
+ return assignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /**
+ * @param part Partition.
+ */
+ protected int regionForPart(int part) {
+ for (Map.Entry<Integer, List<Integer>> entry : REGION_TO_PART_MAP.entrySet()) {
+ List<Integer> range = entry.getValue();
+
+ if (range.get(0) <= part && part < range.get(0) + range.get(1))
+ return entry.getKey();
+ }
+
+ throw new IgniteException("Failed to find zone for partition");
+ }
+
+ /**
+ * @param part Partition.
+ * @param obj Object.
+ */
+ private long hash(int part, Object obj) {
+ long x = ((long)part << 32) | obj.hashCode();
+ x ^= x >>> 12;
+ x ^= x << 25;
+ x ^= x >>> 27;
+ return x * 2685821657736338717L;
+ }
+ }
+
+ /**
+ * Assigns a region to grid part.
+ *
+ * @param gridName Grid name.
+ */
+ protected Integer regionForGrid(String gridName) {
+ char c = gridName.charAt(gridName.length() - 1);
+ switch (c) {
+ case '0':
+ return 1;
+ case '1':
+ case '2':
+ return 2;
+ case '3':
+ case '4':
+ case '5':
+ return 3;
+ default:
+ return 4;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ int sum1 = 0;
+ for (List<Integer> range : REGION_TO_PART_MAP.values())
+ sum1 += range.get(1);
+
+ assertEquals("Illegal partition per region distribution", PARTS_COUNT, sum1);
+
+ startGridsMultiThreaded(GRIDS_COUNT);
+
+ startGrid("client");
+
+ // Fill caches.
+ int clientId = 1;
+ int depositId = 1;
+ int regionId = 1;
+ int p = 1; // Percents counter. Log message will be printed 10 times.
+
+ try (IgniteDataStreamer<ClientKey, Client> clStr = grid(0).dataStreamer("cl");
+ IgniteDataStreamer<DepositKey, Deposit> depStr = grid(0).dataStreamer("de")) {
+ for (int cnt : PARTS_PER_REGION) {
+ // Last region was left empty intentionally.
+ if (regionId < PARTS_PER_REGION.length) {
+ for (int i = 0; i < cnt * CLIENTS_PER_PARTITION; i++) {
+ ClientKey ck = new ClientKey(clientId, regionId);
+
+ Client cl = new Client();
+ cl.firstName = "First_Name_" + clientId;
+ cl.lastName = "Last_Name_" + clientId;
+ cl.passport = clientId * 1_000;
+
+ clStr.addData(ck, cl);
+
+ for (int j = 0; j < DEPOSITS_PER_CLIENT; j++) {
+ DepositKey dk = new DepositKey(depositId++, new ClientKey(clientId, regionId));
+
+ Deposit depo = new Deposit();
+ depo.amount = ThreadLocalRandom8.current().nextLong(1_000_001);
+ depStr.addData(dk, depo);
+ }
+
+ if (clientId / (float)TOTAL_CLIENTS >= p / 10f) {
+ log().info("Loaded " + clientId + " of " + TOTAL_CLIENTS);
+
+ p++;
+ }
+
+ clientId++;
+ }
+ }
+
+ Region region = new Region();
+ region.name = "Region_" + regionId;
+ region.code = regionId * 10;
+
+ grid(0).cache("re").put(regionId, region);
+
+ regionId++;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param orig Originator.
+ */
+ protected void doTestRegionQuery(Ignite orig) {
+ IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+ for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) {
+ SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "regionId=?");
+ qry1.setArgs(regionId);
+
+ List<Cache.Entry<ClientKey, Client>> clients1 = cl.query(qry1).getAll();
+
+ int expRegionCnt = regionId == 5 ? 0 : PARTS_PER_REGION[regionId - 1] * CLIENTS_PER_PARTITION;
+
+ assertEquals("Region " + regionId + " count", expRegionCnt, clients1.size());
+
+ validateClients(regionId, clients1);
+
+ // Repeat the same query with partition set condition.
+ List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+ SqlQuery<ClientKey, Client> qry2 = new SqlQuery<>(Client.class, "1=1");
+ qry2.setPartitions(createRange(range.get(0), range.get(1)));
+
+ try {
+ List<Cache.Entry<ClientKey, Client>> clients2 = cl.query(qry2).getAll();
+
+ assertEquals("Region " + regionId + " count with partition set", expRegionCnt, clients2.size());
+
+ // Query must produce only results from single region.
+ validateClients(regionId, clients2);
+
+ if (regionId == UNMAPPED_REGION)
+ fail();
+ } catch (CacheException ignored) {
+ if (regionId != UNMAPPED_REGION)
+ fail();
+ }
+ }
+ }
+
+ /** */
+ protected int[] createRange(int start, int cnt) {
+ int[] vals = new int[cnt];
+
+ for (int i = 0; i < cnt; i++)
+ vals[i] = start + i;
+
+ return vals;
+ }
+
+ /**
+ * @param orig Originator.
+ */
+ protected void doTestPartitionsQuery(Ignite orig) {
+ IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+ for (int regionId = 1; regionId <= PARTS_PER_REGION.length; regionId++) {
+ log().info("Running test queries for region " + regionId);
+
+ List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+ int[] parts = createRange(range.get(0), range.get(1));
+
+ int off = rnd.nextInt(parts.length);
+
+ int p1 = parts[off], p2 = parts[(off + (1 + rnd.nextInt(parts.length-1))) % parts.length];
+
+ log().info("Parts: " + p1 + " " + p2);
+
+ SqlQuery<ClientKey, Client> qry = new SqlQuery<>(Client.class, "1=1");
+
+ qry.setPartitions(p1, p2);
+
+ try {
+ List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry).getAll();
+
+ // Query must produce only results from two partitions.
+ for (Cache.Entry<ClientKey, Client> client : clients) {
+ int p = orig.affinity("cl").partition(client.getKey());
+
+ assertTrue("Incorrect partition for key", p == p1 || p == p2);
+ }
+
+ if (regionId == UNMAPPED_REGION)
+ fail();
+ } catch (CacheException ignored) {
+ if (regionId != UNMAPPED_REGION)
+ fail();
+ }
+ }
+ }
+
+ /**
+ * @param orig Query originator.
+ * @param regionIds Region ids.
+ */
+ protected void doTestJoinQuery(Ignite orig, int... regionIds) {
+ IgniteCache<ClientKey, Client> cl = orig.cache("cl");
+
+ if (regionIds == null) {
+ regionIds = new int[PARTS_PER_REGION.length];
+
+ for (int i = 0; i < regionIds.length; i++)
+ regionIds[i] = i + 1;
+ }
+
+ for (int regionId : regionIds) {
+ List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(JOIN_QRY);
+
+ int[] pSet = createRange(range.get(0), 1 + rnd.nextInt(range.get(1) - 1));
+
+ qry.setPartitions(pSet);
+
+ try {
+ List<List<?>> rows = cl.query(qry).getAll();
+
+ for (List<?> row : rows) {
+ ClientKey key = (ClientKey)row.get(0);
+
+ int p = orig.affinity("cl").partition(key);
+
+ assertTrue(Arrays.binarySearch(pSet, p) >= 0);
+ }
+
+ // Query must produce only results from single region.
+ for (List<?> row : rows)
+ assertEquals("Region id", regionId, ((Integer)row.get(2)).intValue());
+
+ if (regionId == UNMAPPED_REGION)
+ fail();
+ }
+ catch (CacheException ignored) {
+ if (X.hasCause(ignored, InterruptedException.class, IgniteInterruptedCheckedException.class))
+ return; // Allow interruptions.
+
+ if (regionId != UNMAPPED_REGION)
+ fail();
+ }
+ }
+ }
+
+ /**
+ * @param regionId Region id.
+ * @param clients Clients.
+ */
+ protected void validateClients(int regionId, List<Cache.Entry<ClientKey, Client>> clients) {
+ for (Cache.Entry<ClientKey, Client> entry : clients) {
+ List<Integer> range = REGION_TO_PART_MAP.get(regionId);
+
+ int start = range.get(0) * CLIENTS_PER_PARTITION;
+ int end = start + range.get(1) * CLIENTS_PER_PARTITION;
+
+ int clientId = entry.getKey().clientId;
+
+ assertTrue("Client id in range", start < clientId && start <= end);
+ }
+ }
+
+ /** */
+ protected static class ClientKey extends RegionKey {
+ /** Client id. */
+ @QuerySqlField(index = true)
+ protected int clientId;
+
+ /**
+ * @param clientId Client id.
+ * @param regionId Region id.
+ */
+ public ClientKey(int clientId, int regionId) {
+ this.clientId = clientId;
+ this.regionId = regionId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ClientKey clientKey = (ClientKey)o;
+
+ return clientId == clientKey.clientId;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return clientId;
+ }
+ }
+
+ /** */
+ protected static class DepositKey extends RegionKey {
+ @QuerySqlField(index = true)
+ protected int depositId;
+
+ @QuerySqlField(index = true)
+ protected int clientId;
+
+ /** Client id. */
+ @AffinityKeyMapped
+ protected ClientKey clientKey;
+
+ /**
+ * @param depositId Client id.
+ * @param clientKey Client key.
+ */
+ public DepositKey(int depositId, ClientKey clientKey) {
+ this.depositId = depositId;
+ this.clientId = clientKey.clientId;
+ this.regionId = clientKey.regionId;
+ this.clientKey = clientKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DepositKey that = (DepositKey)o;
+
+ return depositId == that.depositId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return depositId;
+ }
+ }
+
+ /** */
+ protected static class RegionKey implements Serializable {
+ /** Region id. */
+ @QuerySqlField(index = true)
+ protected int regionId;
+ }
+
+ /** */
+ protected static class Client {
+ @QuerySqlField
+ protected String firstName;
+
+ @QuerySqlField
+ protected String lastName;
+
+ @QuerySqlField(index = true)
+ protected int passport;
+ }
+
+ /** */
+ protected static class Deposit {
+ @QuerySqlField
+ protected long amount;
+ }
+
+ /** */
+ protected static class Region {
+ @QuerySqlField
+ protected String name;
+
+ @QuerySqlField
+ protected int code;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
new file mode 100644
index 0000000..0253fe8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryConfigurationSelfTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests cache query configuration.
+ */
+public class IgniteCacheDistributedPartitionQueryConfigurationSelfTest extends GridCommonAbstractTest {
+ /** Tests partition validation. */
+ public void testPartitions() {
+ final SqlFieldsQuery qry = new SqlFieldsQuery("select 1");
+
+ // Empty set is not allowed.
+ failIfNotThrown(new Runnable() {
+ @Override public void run() {
+ qry.setPartitions();
+ }
+ });
+
+ // Duplicates are not allowed.
+ failIfNotThrown(new Runnable() {
+ @Override public void run() {
+ qry.setPartitions(0, 1, 2, 1);
+ }
+ });
+
+ // Values out of range are not allowed.
+ failIfNotThrown(new Runnable() {
+ @Override public void run() {
+ qry.setPartitions(-1, 0, 1);
+ }
+ });
+
+ // Duplicates with unordered input are not allowed.
+ failIfNotThrown(new Runnable() {
+ @Override public void run() {
+ qry.setPartitions(3, 2, 2);
+ }
+ });
+
+ // Values out of range are not allowed.
+ failIfNotThrown(new Runnable() {
+ @Override public void run() {
+ qry.setPartitions(-1, 0, 1);
+ }
+ });
+
+ // Expecting ordered set.
+ int[] tmp = new int[] {6, 2 ,3};
+ qry.setPartitions(tmp);
+
+ assertTrue(Arrays.equals(new int[]{2, 3, 6}, tmp));
+
+ // If already ordered expecting same instance.
+ qry.setPartitions((tmp = new int[] {0, 1, 2}));
+
+ assertTrue(tmp == qry.getPartitions());
+ }
+
+ /**
+ * @param r Runnable.
+ */
+ private void failIfNotThrown(Runnable r) {
+ try {
+ r.run();
+
+ fail();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
new file mode 100644
index 0000000..68f9842
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+
+/**
+ * Tests distributed queries over set of partitions on unstable topology.
+ */
+public class IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest
+ extends IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+ /**
+ * Tests join query within region on unstable topology.
+ */
+ public void testJoinQueryUnstableTopology() throws Exception {
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicIntegerArray states = new AtomicIntegerArray(GRIDS_COUNT);
+
+ final Ignite client = grid("client");
+
+ final AtomicInteger cnt = new AtomicInteger();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+ @Override
+ public void run() {
+ while (!stop.get()) {
+ doTestJoinQuery(client, rnd.nextInt(PARTS_PER_REGION.length) + 1);
+
+ int cur = cnt.incrementAndGet();
+
+ if (cur % 100 == 0)
+ log().info("Queries count: " + cur);
+ }
+ }
+ }, QUERY_THREADS_CNT);
+
+ final AtomicIntegerArray restartStats = new AtomicIntegerArray(GRIDS_COUNT);
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (!stop.get()) {
+ int grid = rnd.nextInt(GRIDS_COUNT);
+
+ String name = getTestIgniteInstanceName(grid);
+
+ Integer regionId = regionForGrid(name);
+
+ // Restart nodes only from region with enough number of nodes.
+ if (regionId != 3 && regionId != 4)
+ continue;
+
+ if (states.compareAndSet(grid, 0, 1)) {
+ restartStats.incrementAndGet(grid);
+
+ try {
+ stopGrid(grid);
+
+ Thread.sleep(rnd.nextInt(NODE_RESTART_TIME));
+
+ startGrid(grid);
+
+ Thread.sleep(rnd.nextInt(NODE_RESTART_TIME));
+ } finally {
+ states.set(grid, 0);
+ }
+ }
+ }
+
+ return null;
+ }
+ }, RESTART_THREADS_CNT);
+
+ try {
+ fut2.get(60, TimeUnit.SECONDS);
+ } catch (IgniteFutureTimeoutCheckedException ignored) {
+ stop.set(true);
+ }
+
+ try {
+ fut.get();
+ } finally {
+ log().info("Queries count: " + cnt.get());
+
+ for (int i = 0; i < GRIDS_COUNT; i++)
+ log().info("Grid [name = " + getTestIgniteInstanceName(i) + ", idx=" + i + " ] restarts count: " +
+ restartStats.get(i));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
new file mode 100644
index 0000000..00c3848
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedPartitionQuerySelfTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+
+/**
+ * Tests distributed queries over set of partitions on stable topology.
+ */
+public class IgniteCacheDistributedPartitionQuerySelfTest extends IgniteCacheDistributedPartitionQueryAbstractSelfTest {
+ /** Tests query within region. */
+ public void testRegionQuery() {
+ doTestRegionQuery(grid(0));
+ }
+
+ /** Tests query within region (client). */
+ public void testRegionQueryClient() throws Exception {
+ doTestRegionQuery(grid("client"));
+ }
+
+ /** Test query within partitions. */
+ public void testPartitionsQuery() {
+ doTestPartitionsQuery(grid(0));
+ }
+
+ /** Test query within partitions (client). */
+ public void testPartitionsQueryClient() throws Exception {
+ doTestPartitionsQuery(grid("client"));
+ }
+
+ /** Tests join query within region. */
+ public void testJoinQuery() {
+ doTestJoinQuery(grid(0));
+ }
+
+ /** Tests join query within region. */
+ public void testJoinQueryClient() throws Exception {
+ doTestJoinQuery(grid("client"));
+ }
+
+ /** Tests local query over partitions. */
+ public void testLocalQuery() {
+ Affinity<Object> affinity = grid(0).affinity("cl");
+
+ int[] parts = affinity.primaryPartitions(grid(0).localNode());
+
+ Arrays.sort(parts);
+
+ IgniteCache<ClientKey, Client> cl = grid(0).cache("cl");
+
+ SqlQuery<ClientKey, Client> qry1 = new SqlQuery<>(Client.class, "1=1");
+ qry1.setLocal(true);
+ qry1.setPartitions(parts[0]);
+
+ List<Cache.Entry<ClientKey, Client>> clients = cl.query(qry1).getAll();
+
+ for (Cache.Entry<ClientKey, Client> client : clients)
+ assertEquals("Incorrect partition", parts[0], affinity.partition(client.getKey()));
+
+ SqlFieldsQuery qry2 = new SqlFieldsQuery("select cl._KEY, cl._VAL from \"cl\".Client cl");
+ qry2.setLocal(true);
+ qry2.setPartitions(parts[0]);
+
+ List<List<?>> rows = cl.query(qry2).getAll();
+
+ for (List<?> row : rows)
+ assertEquals("Incorrect partition", parts[0], affinity.partition(row.get(0)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 6fc9c39..001f40b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -33,6 +33,8 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -89,6 +91,12 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
+ MemoryConfiguration memCfg = new MemoryConfiguration();
+ memCfg.setDefaultMemoryPolicyName("default");
+ memCfg.setMemoryPolicies(new MemoryPolicyConfiguration().setName("default").setSize(50 * 1024 * 1024));
+
+ c.setMemoryConfiguration(memCfg);
+
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(ipFinder);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ef610c0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 862d1a2..032e544 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -68,6 +68,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheA
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryCancelSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedPartitionQueryConfigurationSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest;
@@ -277,6 +280,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteQueryDedicatedPoolTest.class);
suite.addTestSuite(IgniteSqlEntryCacheModeAgnosticTest.class);
suite.addTestSuite(QueryEntityCaseMismatchTest.class);
+ suite.addTestSuite(IgniteCacheDistributedPartitionQuerySelfTest.class);
+ suite.addTestSuite(IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.class);
+ suite.addTestSuite(IgniteCacheDistributedPartitionQueryConfigurationSelfTest.class);
return suite;
}