You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:12 UTC
[04/50] [abbrv] ignite git commit: IGNITE-7166: SQL: fixed affinity
nodes processing for REPLICATED caches. This closes #3202.
IGNITE-7166: SQL: fixed affinity nodes processing for REPLICATED caches. This closes #3202.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3bbc984
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3bbc984
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3bbc984
Branch: refs/heads/ignite-zk
Commit: e3bbc984c1acd65ad9d822ad6fb634221d1a5a2d
Parents: 7cf049e
Author: devozerov <pp...@gmail.com>
Authored: Tue Dec 12 20:07:07 2017 +0300
Committer: devozerov <pp...@gmail.com>
Committed: Tue Dec 12 20:07:07 2017 +0300
----------------------------------------------------------------------
.../processors/affinity/AffinityAssignment.java | 5 +
.../affinity/GridAffinityAssignment.java | 45 ++++--
.../affinity/HistoryAffinityAssignment.java | 15 ++
.../h2/twostep/GridReduceQueryExecutor.java | 44 +++---
...ldsQueryJoinNoPrimaryPartitionsSelfTest.java | 151 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
6 files changed, 223 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index 06207d3..f78ab60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -66,6 +66,11 @@ public interface AffinityAssignment {
public HashSet<UUID> getIds(int part);
/**
+ * @return Nodes having parimary and backup assignments.
+ */
+ public Set<ClusterNode> nodes();
+
+ /**
* @return Nodes having primary partitions assignments.
*/
public Set<ClusterNode> primaryPartitionNodes();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 35130a3..6da6aaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Cached affinity calculations.
*/
+@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridAffinityAssignment implements AffinityAssignment, Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -51,6 +52,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
/** Assignment node IDs */
private transient volatile List<HashSet<UUID>> assignmentIds;
+ /** Nodes having primary or backup partition assignments. */
+ private transient volatile Set<ClusterNode> nodes;
+
/** Nodes having primary partitions assignments. */
private transient volatile Set<ClusterNode> primaryPartsNodes;
@@ -182,29 +186,44 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
return assignmentIds0.get(part);
}
- /**
- * @return Nodes having primary partitions assignments.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- public Set<ClusterNode> primaryPartitionNodes() {
- Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> nodes() {
+ Set<ClusterNode> res = nodes;
- if (primaryPartsNodes0 == null) {
- int parts = assignment.size();
+ if (res == null) {
+ res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (nodes.size() > 0)
+ res.addAll(nodes);
+ }
+
+ nodes = res;
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<ClusterNode> primaryPartitionNodes() {
+ Set<ClusterNode> res = primaryPartsNodes;
- primaryPartsNodes0 = new HashSet<>();
+ if (res == null) {
+ res = new HashSet<>();
- for (int p = 0; p < parts; p++) {
+ for (int p = 0; p < assignment.size(); p++) {
List<ClusterNode> nodes = assignment.get(p);
if (nodes.size() > 0)
- primaryPartsNodes0.add(nodes.get(0));
+ res.add(nodes.get(0));
}
- primaryPartsNodes = primaryPartsNodes0;
+ primaryPartsNodes = res;
}
- return primaryPartsNodes0;
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index e502dd5..94eaab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -30,6 +30,7 @@ import java.util.UUID;
/**
*
*/
+@SuppressWarnings("ForLoopReplaceableByForEach")
public class HistoryAffinityAssignment implements AffinityAssignment {
/** */
private final AffinityTopologyVersion topVer;
@@ -97,6 +98,20 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
}
/** {@inheritDoc} */
+ @Override public Set<ClusterNode> nodes() {
+ Set<ClusterNode> res = new HashSet<>();
+
+ for (int p = 0; p < assignment.size(); p++) {
+ List<ClusterNode> nodes = assignment.get(p);
+
+ if (!F.isEmpty(nodes))
+ res.addAll(nodes);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
@Override public Set<ClusterNode> primaryPartitionNodes() {
Set<ClusterNode> res = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8e994aa..29e9b27 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -400,7 +400,7 @@ public class GridReduceQueryExecutor {
// Explicit partitions mapping is not applicable to replicated cache.
if (cctx.isReplicated()) {
- for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).primaryPartitionNodes())
+ for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).nodes())
mapping.put(clusterNode, null);
return mapping;
@@ -478,35 +478,27 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(extraNodes))
throw new CacheException("Failed to find data nodes for cache: " + extraCacheName);
- if (isReplicatedOnly && extraCctx.isReplicated()) {
- nodes.retainAll(extraNodes);
+ boolean disjoint;
- if (map.isEmpty()) {
- if (isPreloadingActive(cacheIds))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraCacheName + "]");
+ if (extraCctx.isReplicated()) {
+ if (isReplicatedOnly) {
+ nodes.retainAll(extraNodes);
+
+ disjoint = map.isEmpty();
}
- }
- else if (!isReplicatedOnly && extraCctx.isReplicated()) {
- if (!extraNodes.containsAll(nodes))
- if (isPreloadingActive(cacheIds))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraCacheName + "]");
- }
- else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
- if (!extraNodes.equals(nodes))
- if (isPreloadingActive(cacheIds))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraCacheName + "]");
+ else
+ disjoint = !extraNodes.containsAll(nodes);
}
else
- throw new IllegalStateException();
+ disjoint = !extraNodes.equals(nodes);
+
+ if (disjoint) {
+ if (isPreloadingActive(cacheIds))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+ ", cache2=" + extraCacheName + "]");
+ }
}
return map;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
new file mode 100644
index 0000000..a5b3706
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests non-collocated join with REPLICATED cache and no primary partitions for that cache on some nodes.
+ */
+@SuppressWarnings("unused")
+public class IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest extends GridCommonAbstractTest {
+ /** Client node name. */
+ public static final String NODE_CLI = "client";
+
+ /** */
+ public static final String CACHE_PARTITIONED = "partitioned";
+
+ /** */
+ public static final String CACHE_REPLICATED = "replicated";
+
+ /** */
+ public static final int REP_CNT = 3;
+
+ /** */
+ public static final int PART_CNT = 100;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(F.eq(NODE_CLI, igniteInstanceName));
+
+ CacheConfiguration<Integer, PartValue> ccfg1 = new CacheConfiguration<>(CACHE_PARTITIONED);
+
+ ccfg1.setCacheMode(PARTITIONED);
+ ccfg1.setAtomicityMode(TRANSACTIONAL);
+ ccfg1.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg1.setIndexedTypes(Integer.class, PartValue.class);
+ ccfg1.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+ CacheConfiguration<Integer, RepValue> ccfg2 = new CacheConfiguration<>(CACHE_REPLICATED);
+
+ ccfg2.setAffinity(new RendezvousAffinityFunction(false, REP_CNT));
+ ccfg2.setCacheMode(REPLICATED);
+ ccfg2.setAtomicityMode(TRANSACTIONAL);
+ ccfg2.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg2.setIndexedTypes(Integer.class, RepValue.class);
+ ccfg2.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+ return cfg;
+ }
+
+ /**{@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGridsMultiThreaded(3);
+
+ Ignite cli = startGrid(NODE_CLI);
+
+ for (int i = 0; i < REP_CNT; i++)
+ cli.cache(CACHE_REPLICATED).put(i, new RepValue(i));
+
+ for (int i = 0; i < PART_CNT; i++)
+ cli.cache(CACHE_PARTITIONED).put(i, new PartValue(i, ((i + 1) % REP_CNT)));
+ }
+
+ /**{@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * Test non-colocated join.
+ *
+ * @throws Exception If failed.
+ */
+ public void testJoinNonCollocated() throws Exception {
+ SqlFieldsQuery qry = new SqlFieldsQuery("SELECT COUNT(*) FROM PartValue p, RepValue r WHERE p.repId=r.id");
+
+ long cnt = (Long)grid(NODE_CLI).cache(CACHE_PARTITIONED).query(qry).getAll().get(0).get(0);
+
+ assertEquals(PART_CNT, cnt);
+ }
+
+ /**
+ * Value for PARTITIONED cache.
+ */
+ public static class PartValue {
+ /** Id. */
+ @QuerySqlField
+ private int id;
+
+ /** Rep id. */
+ @QuerySqlField
+ private int repId;
+
+ /**
+ * @param id Id.
+ * @param repId Rep id.
+ */
+ public PartValue(int id, int repId) {
+ this.id = id;
+ this.repId = repId;
+ }
+ }
+
+ /**
+ * Value for REPLICATED cache.
+ */
+ public static class RepValue {
+ /** Id. */
+ @QuerySqlField
+ private int id;
+
+ /**
+ * @param id Id.
+ */
+ public RepValue(int id) {
+ this.id = id;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 4d8016b..cb6426d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
@@ -299,6 +300,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheReplicatedFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheReplicatedFieldsQueryROSelfTest.class);
suite.addTestSuite(IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest.class);
+ suite.addTestSuite(IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.class);
suite.addTestSuite(IgniteCachePartitionedFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheAtomicFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.class);