You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:12 UTC

[04/50] [abbrv] ignite git commit: IGNITE-7166: SQL: fixed affinity nodes processing for REPLICATED caches. This closes #3202.

IGNITE-7166: SQL: fixed affinity nodes processing for REPLICATED caches. This closes #3202.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3bbc984
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3bbc984
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3bbc984

Branch: refs/heads/ignite-zk
Commit: e3bbc984c1acd65ad9d822ad6fb634221d1a5a2d
Parents: 7cf049e
Author: devozerov <pp...@gmail.com>
Authored: Tue Dec 12 20:07:07 2017 +0300
Committer: devozerov <pp...@gmail.com>
Committed: Tue Dec 12 20:07:07 2017 +0300

----------------------------------------------------------------------
 .../processors/affinity/AffinityAssignment.java |   5 +
 .../affinity/GridAffinityAssignment.java        |  45 ++++--
 .../affinity/HistoryAffinityAssignment.java     |  15 ++
 .../h2/twostep/GridReduceQueryExecutor.java     |  44 +++---
 ...ldsQueryJoinNoPrimaryPartitionsSelfTest.java | 151 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 6 files changed, 223 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index 06207d3..f78ab60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -66,6 +66,11 @@ public interface AffinityAssignment {
     public HashSet<UUID> getIds(int part);
 
     /**
+     * @return Nodes having parimary and backup assignments.
+     */
+    public Set<ClusterNode> nodes();
+
+    /**
      * @return Nodes having primary partitions assignments.
      */
     public Set<ClusterNode> primaryPartitionNodes();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 35130a3..6da6aaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Cached affinity calculations.
  */
+@SuppressWarnings("ForLoopReplaceableByForEach")
 public class GridAffinityAssignment implements AffinityAssignment, Serializable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -51,6 +52,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     /** Assignment node IDs */
     private transient volatile List<HashSet<UUID>> assignmentIds;
 
+    /** Nodes having primary or backup partition assignments. */
+    private transient volatile Set<ClusterNode> nodes;
+
     /** Nodes having primary partitions assignments. */
     private transient volatile Set<ClusterNode> primaryPartsNodes;
 
@@ -182,29 +186,44 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
         return assignmentIds0.get(part);
     }
 
-    /**
-     * @return Nodes having primary partitions assignments.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    public Set<ClusterNode> primaryPartitionNodes() {
-        Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> nodes() {
+        Set<ClusterNode> res = nodes;
 
-        if (primaryPartsNodes0 == null) {
-            int parts = assignment.size();
+        if (res == null) {
+            res = new HashSet<>();
+
+            for (int p = 0; p < assignment.size(); p++) {
+                List<ClusterNode> nodes = assignment.get(p);
+
+                if (nodes.size() > 0)
+                    res.addAll(nodes);
+            }
+
+            nodes = res;
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = primaryPartsNodes;
 
-            primaryPartsNodes0 = new HashSet<>();
+        if (res == null) {
+            res = new HashSet<>();
 
-            for (int p = 0; p < parts; p++) {
+            for (int p = 0; p < assignment.size(); p++) {
                 List<ClusterNode> nodes = assignment.get(p);
 
                 if (nodes.size() > 0)
-                    primaryPartsNodes0.add(nodes.get(0));
+                    res.add(nodes.get(0));
             }
 
-            primaryPartsNodes = primaryPartsNodes0;
+            primaryPartsNodes = res;
         }
 
-        return primaryPartsNodes0;
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index e502dd5..94eaab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 /**
  *
  */
+@SuppressWarnings("ForLoopReplaceableByForEach")
 public class HistoryAffinityAssignment implements AffinityAssignment {
     /** */
     private final AffinityTopologyVersion topVer;
@@ -97,6 +98,20 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
     }
 
     /** {@inheritDoc} */
+    @Override public Set<ClusterNode> nodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes))
+                res.addAll(nodes);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<ClusterNode> primaryPartitionNodes() {
         Set<ClusterNode> res = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8e994aa..29e9b27 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -400,7 +400,7 @@ public class GridReduceQueryExecutor {
 
         // Explicit partitions mapping is not applicable to replicated cache.
         if (cctx.isReplicated()) {
-            for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).primaryPartitionNodes())
+            for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).nodes())
                 mapping.put(clusterNode, null);
 
             return mapping;
@@ -478,35 +478,27 @@ public class GridReduceQueryExecutor {
             if (F.isEmpty(extraNodes))
                 throw new CacheException("Failed to find data nodes for cache: " + extraCacheName);
 
-            if (isReplicatedOnly && extraCctx.isReplicated()) {
-                nodes.retainAll(extraNodes);
+            boolean disjoint;
 
-                if (map.isEmpty()) {
-                    if (isPreloadingActive(cacheIds))
-                        return null; // Retry.
-                    else
-                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                            ", cache2=" + extraCacheName + "]");
+            if (extraCctx.isReplicated()) {
+                if (isReplicatedOnly) {
+                    nodes.retainAll(extraNodes);
+
+                    disjoint = map.isEmpty();
                 }
-            }
-            else if (!isReplicatedOnly && extraCctx.isReplicated()) {
-                if (!extraNodes.containsAll(nodes))
-                    if (isPreloadingActive(cacheIds))
-                        return null; // Retry.
-                    else
-                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                            ", cache2=" + extraCacheName + "]");
-            }
-            else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
-                if (!extraNodes.equals(nodes))
-                    if (isPreloadingActive(cacheIds))
-                        return null; // Retry.
-                    else
-                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                            ", cache2=" + extraCacheName + "]");
+                else
+                    disjoint = !extraNodes.containsAll(nodes);
             }
             else
-                throw new IllegalStateException();
+                disjoint = !extraNodes.equals(nodes);
+
+            if (disjoint) {
+                if (isPreloadingActive(cacheIds))
+                    return null; // Retry.
+                else
+                    throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+                        ", cache2=" + extraCacheName + "]");
+            }
         }
 
         return map;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
new file mode 100644
index 0000000..a5b3706
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests non-collocated join with REPLICATED cache and no primary partitions for that cache on some nodes.
+ */
+@SuppressWarnings("unused")
+public class IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest extends GridCommonAbstractTest {
+    /** Client node name. */
+    public static final String NODE_CLI = "client";
+
+    /** */
+    public static final String CACHE_PARTITIONED = "partitioned";
+
+    /** */
+    public static final String CACHE_REPLICATED = "replicated";
+
+    /** */
+    public static final int REP_CNT = 3;
+
+    /** */
+    public static final int PART_CNT = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setClientMode(F.eq(NODE_CLI, igniteInstanceName));
+
+        CacheConfiguration<Integer, PartValue> ccfg1 = new CacheConfiguration<>(CACHE_PARTITIONED);
+
+        ccfg1.setCacheMode(PARTITIONED);
+        ccfg1.setAtomicityMode(TRANSACTIONAL);
+        ccfg1.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg1.setIndexedTypes(Integer.class, PartValue.class);
+        ccfg1.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+        CacheConfiguration<Integer, RepValue> ccfg2 = new CacheConfiguration<>(CACHE_REPLICATED);
+
+        ccfg2.setAffinity(new RendezvousAffinityFunction(false, REP_CNT));
+        ccfg2.setCacheMode(REPLICATED);
+        ccfg2.setAtomicityMode(TRANSACTIONAL);
+        ccfg2.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg2.setIndexedTypes(Integer.class, RepValue.class);
+        ccfg2.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        return cfg;
+    }
+
+    /**{@inheritDoc}  */
+    @Override protected void beforeTest() throws Exception {
+        startGridsMultiThreaded(3);
+
+        Ignite cli = startGrid(NODE_CLI);
+
+        for (int i = 0; i < REP_CNT; i++)
+            cli.cache(CACHE_REPLICATED).put(i, new RepValue(i));
+
+        for (int i = 0; i < PART_CNT; i++)
+            cli.cache(CACHE_PARTITIONED).put(i, new PartValue(i, ((i + 1) % REP_CNT)));
+    }
+
+    /**{@inheritDoc}  */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test non-colocated join.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJoinNonCollocated() throws Exception {
+        SqlFieldsQuery qry = new SqlFieldsQuery("SELECT COUNT(*) FROM PartValue p, RepValue r WHERE p.repId=r.id");
+
+        long cnt = (Long)grid(NODE_CLI).cache(CACHE_PARTITIONED).query(qry).getAll().get(0).get(0);
+
+        assertEquals(PART_CNT, cnt);
+    }
+
+    /**
+     * Value for PARTITIONED cache.
+     */
+    public static class PartValue {
+        /** Id. */
+        @QuerySqlField
+        private int id;
+
+        /** Rep id. */
+        @QuerySqlField
+        private int repId;
+
+        /**
+         * @param id Id.
+         * @param repId Rep id.
+         */
+        public PartValue(int id, int repId) {
+            this.id = id;
+            this.repId = repId;
+        }
+    }
+
+    /**
+     * Value for REPLICATED cache.
+     */
+    public static class RepValue {
+        /** Id. */
+        @QuerySqlField
+        private int id;
+
+        /**
+         * @param id Id.
+         */
+        public RepValue(int id) {
+            this.id = id;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3bbc984/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 4d8016b..cb6426d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
@@ -299,6 +300,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheReplicatedFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheReplicatedFieldsQueryROSelfTest.class);
         suite.addTestSuite(IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest.class);
+        suite.addTestSuite(IgniteCacheReplicatedFieldsQueryJoinNoPrimaryPartitionsSelfTest.class);
         suite.addTestSuite(IgniteCachePartitionedFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.class);