You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/13 16:08:04 UTC
ignite git commit: Ignite-1913
Repository: ignite
Updated Branches:
refs/heads/ignite-1913 [created] 3fe6cce3e
Ignite-1913
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fe6cce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fe6cce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fe6cce3
Branch: refs/heads/ignite-1913
Commit: 3fe6cce3ec2db2e0f2789fad922c335d24240ea1
Parents: 3a8c19e
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Nov 13 18:07:50 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Nov 13 18:07:50 2015 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 76 ++++++--
.../GridDhtPartitionsExchangeFuture.java | 3 +-
.../GridDhtPartitionsSingleMessage.java | 25 ++-
...cingDelayedPartitionMapExchangeSelfTest.java | 177 +++++++++++++++++++
4 files changed, 267 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..f93c40f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -713,12 +713,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return {@code True} if message was sent, {@code false} if node left grid.
*/
private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
+ boolean retry = false;
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal() && cacheCtx.started())
- m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+ GridDhtPartitionsFullMessage m;
+
+ do {
+ AffinityTopologyVersion topVer = cctx.exchange().topologyVersion();
+
+ m = new GridDhtPartitionsFullMessage(null, null, topVer);
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ cacheCtx.topology().readLock();
+
+ try {
+ if (!cacheCtx.topology().topologyVersion().equals(topVer)) {
+ retry = true;
+
+ break;
+ }
+
+ if (!cacheCtx.isLocal() && cacheCtx.started())
+ m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+ }
+ finally {
+ cacheCtx.topology().readUnlock();
+ }
+ }
}
+ while (retry);
// It is important that client topologies be added after contexts.
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
@@ -749,17 +772,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param id ID.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
- cctx.kernalContext().clientNode(),
- cctx.versions().last());
+ boolean retry = false;
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap();
+ GridDhtPartitionsSingleMessage m;
- m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+ do {
+ AffinityTopologyVersion topVer = cctx.exchange().topologyVersion();
+
+ m = new GridDhtPartitionsSingleMessage(id,
+ cctx.kernalContext().clientNode(),
+ cctx.versions().last(),
+ topVer);
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal()) {
+ cacheCtx.topology().readLock();
+
+ try {
+ if (!cacheCtx.topology().topologyVersion().equals(topVer)) {
+ retry = true;
+
+ break;
+ }
+
+ m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+ }
+ finally {
+ cacheCtx.topology().readUnlock();
+ }
+ }
}
}
+ while (retry);
for (GridClientPartitionTopology top : clientTops.values()) {
GridDhtPartitionMap locMap = top.localPartitionMap();
@@ -925,6 +969,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+ if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
+ msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility.
+ cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0)
+ continue;
+
if (cacheCtx != null && !cacheCtx.started())
continue; // Can safely ignore background exchange.
@@ -971,6 +1020,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+ if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
+ msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility.
+ cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0)
+ continue;
+
GridDhtPartitionTopology top = null;
if (cacheCtx == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2f2944d..4fc4704 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -959,7 +959,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
clientOnlyExchange,
- cctx.versions().last());
+ cctx.versions().last(),
+ id.topologyVersion());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..bb6ea27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,14 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,6 +51,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** */
private boolean client;
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
/**
* Required by {@link Externalizable}.
*/
@@ -63,10 +68,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
*/
public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
boolean client,
- @Nullable GridCacheVersion lastVer) {
+ @Nullable GridCacheVersion lastVer,
+ @NotNull AffinityTopologyVersion topVer) {
super(exchId, lastVer);
this.client = client;
+ this.topVer = topVer;
}
/**
@@ -140,6 +147,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
writer.incrementState();
+ case 7:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -172,6 +185,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
reader.incrementState();
+ case 7:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class);
@@ -184,7 +205,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
new file mode 100644
index 0000000..e275a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ *
+ */
+public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** partitioned cache name. */
+ protected static String CACHE = null;
+
+ /** */
+ private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>();
+
+ /** */
+ private volatile boolean record = false;
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = new DelayableCommunicationSpi();
+
+ commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+ commSpi.setTcpNoDelay(true);
+
+ iCfg.setCommunicationSpi(commSpi);
+
+ return iCfg;
+ }
+
+ /**
+ * Helps to delay GridDhtPartitionsFullMessages.
+ */
+ public class DelayableCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(final ClusterNode node, final Message msg,
+ final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ final Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
+ ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
+ rs.putIfAbsent(node.id(), new Runnable() {
+ @Override public void run() {
+ DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+ }
+ });
+ }
+ else
+ try {
+ super.sendMessage(node, msg, ackClosure);
+ }
+ catch (Exception e) {
+ U.log(null, e);
+ }
+
+ }
+ }
+
+ /**
+ * @throws Exception e.
+ */
+ public void test() throws Exception {
+ startGrid(0);
+
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+ cfg.setName(CACHE);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setBackups(1);
+
+ ignite(0).getOrCreateCache(cfg);
+
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+
+ awaitPartitionMapExchange(true);
+
+ for (int i = 0; i < 2; i++) {
+ stopGrid(3);
+
+ awaitPartitionMapExchange(true);
+
+ startGrid(3);
+
+ awaitPartitionMapExchange(true);
+ }
+
+ assert rs.isEmpty();
+
+ record = true;
+
+ while (rs.size() < 3) { // N - 1 nodes.
+ U.sleep(10);
+ }
+
+ ignite(0).destroyCache(CACHE);
+
+ ignite(0).getOrCreateCache(cfg);
+
+ awaitPartitionMapExchange();
+
+ for (Runnable r : rs.values()) {
+ r.run();
+ }
+
+ U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+
+ stopGrid(3); // Forces exchange at all nodes.
+
+ awaitPartitionMapExchange();
+
+ long topVer1 = grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+ .topologyVersion();
+ long topVer2 = grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+ .topologyVersion();
+
+ stopGrid(0); // Should force exchange.
+
+ awaitPartitionMapExchange();
+
+ // Will fail in case exchange-workers are dead.
+ assert grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+ .topologyVersion() > topVer1;
+ assert grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion()
+ .topologyVersion() > topVer2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+}