You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/13 16:08:04 UTC

ignite git commit: Ignite-1913

Repository: ignite
Updated Branches:
  refs/heads/ignite-1913 [created] 3fe6cce3e


Ignite-1913


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fe6cce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fe6cce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fe6cce3

Branch: refs/heads/ignite-1913
Commit: 3fe6cce3ec2db2e0f2789fad922c335d24240ea1
Parents: 3a8c19e
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Nov 13 18:07:50 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Nov 13 18:07:50 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  76 ++++++--
 .../GridDhtPartitionsExchangeFuture.java        |   3 +-
 .../GridDhtPartitionsSingleMessage.java         |  25 ++-
 ...cingDelayedPartitionMapExchangeSelfTest.java | 177 +++++++++++++++++++
 4 files changed, 267 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..f93c40f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -713,12 +713,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @return {@code True} if message was sent, {@code false} if node left grid.
      */
     private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
+        boolean retry = false;
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started())
-                m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+        GridDhtPartitionsFullMessage m;
+
+        do {
+            AffinityTopologyVersion topVer = cctx.exchange().topologyVersion();
+
+            m = new GridDhtPartitionsFullMessage(null, null, topVer);
+
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                cacheCtx.topology().readLock();
+
+                try {
+                    if (!cacheCtx.topology().topologyVersion().equals(topVer)) {
+                        retry = true;
+
+                        break;
+                    }
+
+                    if (!cacheCtx.isLocal() && cacheCtx.started())
+                        m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+                }
+                finally {
+                    cacheCtx.topology().readUnlock();
+                }
+            }
         }
+        while (retry);
 
         // It is important that client topologies be added after contexts.
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
@@ -749,17 +772,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param id ID.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
-            cctx.kernalContext().clientNode(),
-            cctx.versions().last());
+        boolean retry = false;
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap();
+        GridDhtPartitionsSingleMessage m;
 
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+        do {
+            AffinityTopologyVersion topVer = cctx.exchange().topologyVersion();
+
+            m = new GridDhtPartitionsSingleMessage(id,
+                cctx.kernalContext().clientNode(),
+                cctx.versions().last(),
+                topVer);
+
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (!cacheCtx.isLocal()) {
+                    cacheCtx.topology().readLock();
+
+                    try {
+                        if (!cacheCtx.topology().topologyVersion().equals(topVer)) {
+                            retry = true;
+
+                            break;
+                        }
+
+                        m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+                    }
+                    finally {
+                        cacheCtx.topology().readUnlock();
+                    }
+                }
             }
         }
+        while (retry);
 
         for (GridClientPartitionTopology top : clientTops.values()) {
             GridDhtPartitionMap locMap = top.localPartitionMap();
@@ -925,6 +969,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
+                    if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
+                        msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility.
+                        cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0)
+                        continue;
+
                     if (cacheCtx != null && !cacheCtx.started())
                         continue; // Can safely ignore background exchange.
 
@@ -971,6 +1020,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
+                    if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
+                        msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility.
+                        cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0)
+                        continue;
+
                     GridDhtPartitionTopology top = null;
 
                     if (cacheCtx == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2f2944d..4fc4704 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -959,7 +959,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
-            cctx.versions().last());
+            cctx.versions().last(),
+            id.topologyVersion());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..bb6ea27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,14 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -49,6 +51,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** */
     private boolean client;
 
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -63,10 +68,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
-        @Nullable GridCacheVersion lastVer) {
+        @Nullable GridCacheVersion lastVer,
+        @NotNull AffinityTopologyVersion topVer) {
         super(exchId, lastVer);
 
         this.client = client;
+        this.topVer = topVer;
     }
 
     /**
@@ -140,6 +147,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -172,6 +185,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -184,7 +205,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
new file mode 100644
index 0000000..e275a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -0,0 +1,177 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ *
+ */
+public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** partitioned cache name. */
+    protected static String CACHE = null;
+
+    /** */
+    private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>();
+
+    /** */
+    private volatile boolean record = false;
+
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = new DelayableCommunicationSpi();
+
+        commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        commSpi.setTcpNoDelay(true);
+
+        iCfg.setCommunicationSpi(commSpi);
+
+        return iCfg;
+    }
+
+    /**
+     * Helps to delay GridDhtPartitionsFullMessages.
+     */
+    public class DelayableCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(final ClusterNode node, final Message msg,
+            final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+            final Object msg0 = ((GridIoMessage)msg).message();
+
+            if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
+                ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
+                    rs.putIfAbsent(node.id(), new Runnable() {
+                        @Override public void run() {
+                            DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+                        }
+                    });
+            }
+            else
+                try {
+                    super.sendMessage(node, msg, ackClosure);
+                }
+                catch (Exception e) {
+                    U.log(null, e);
+                }
+
+        }
+    }
+
+    /**
+     * @throws Exception e.
+     */
+    public void test() throws Exception {
+        startGrid(0);
+
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setName(CACHE);
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setBackups(1);
+
+        ignite(0).getOrCreateCache(cfg);
+
+        startGrid(1);
+        startGrid(2);
+        startGrid(3);
+
+        awaitPartitionMapExchange(true);
+
+        for (int i = 0; i < 2; i++) {
+            stopGrid(3);
+
+            awaitPartitionMapExchange(true);
+
+            startGrid(3);
+
+            awaitPartitionMapExchange(true);
+        }
+
+        assert rs.isEmpty();
+
+        record = true;
+
+        while (rs.size() < 3) { // N - 1 nodes.
+            U.sleep(10);
+        }
+
+        ignite(0).destroyCache(CACHE);
+
+        ignite(0).getOrCreateCache(cfg);
+
+        awaitPartitionMapExchange();
+
+        for (Runnable r : rs.values()) {
+            r.run();
+        }
+
+        U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+
+        stopGrid(3); // Forces exchange at all nodes.
+
+        awaitPartitionMapExchange();
+
+        long topVer1 = grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+            .topologyVersion();
+        long topVer2 = grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+            .topologyVersion();
+
+        stopGrid(0); // Should force exchange.
+
+        awaitPartitionMapExchange();
+
+        // Will fail in case exchange-workers are dead.
+        assert grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+            .topologyVersion() > topVer1;
+        assert grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+            .topologyVersion() > topVer2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+}