You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/03 08:47:05 UTC
[7/8] ignite git commit: ignite-1027 Fixed early rebalance sync
future completion.
ignite-1027 Fixed early rebalance sync future completion.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad9e4db5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad9e4db5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad9e4db5
Branch: refs/heads/ignite-1537
Commit: ad9e4db5b87b064d13db4f9251c25efd535fb9e8
Parents: 9b60c75
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 3 10:45:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 3 10:45:30 2015 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 17 ++--
.../processors/cache/GridCachePreloader.java | 11 ++-
.../dht/preloader/GridDhtPartitionDemander.java | 41 ++++----
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../preloader/GridDhtPreloaderAssignments.java | 19 +++-
.../dht/GridCacheDhtPreloadDelayedSelfTest.java | 37 +++++---
...cingDelayedPartitionMapExchangeSelfTest.java | 9 +-
.../GridCacheRebalancingAsyncSelfTest.java | 3 +-
.../GridCacheRebalancingSyncCheckDataTest.java | 98 ++++++++++++++++++++
.../GridCacheRebalancingSyncSelfTest.java | 55 +++++------
...eRebalancingUnmarshallingFailedSelfTest.java | 6 +-
.../testsuites/IgniteCacheTestSuite3.java | 2 +
12 files changed, 223 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b13a5af..a0f7f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1399,8 +1399,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
waitList.add(cctx.cacheContext(cId).name());
}
- Callable<Boolean> r = cacheCtx.preloader().addAssignments(
- assignsMap.get(cacheId), forcePreload, waitList, cnt);
+ Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+ forcePreload,
+ waitList,
+ cnt);
if (r != null) {
U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
@@ -1425,7 +1427,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- if (marshR != null)
+ if (marshR != null) {
try {
marshR.call(); //Marshaller cache rebalancing launches in sync way.
}
@@ -1435,6 +1437,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
continue;
}
+ }
final GridFutureAdapter fut = new GridFutureAdapter();
@@ -1463,17 +1466,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.onDone();
}
}
- }, /*system pool*/ true);
+ }, /*system pool*/true);
}
- else
+ else {
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
}
- else
+ else {
U.log(log, "Skipping rebalancing (nothing scheduled) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
}
}
catch (IgniteInterruptedCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 8e1164b..c8fcb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -86,9 +86,9 @@ public interface GridCachePreloader {
/**
* @param exchFut Exchange future to assign.
- * @return Assignments.
+ * @return Assignments or {@code null} if detected that there are pending exchanges.
*/
- public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
+ @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
/**
* Adds assignments to preloader.
@@ -97,9 +97,12 @@ public interface GridCachePreloader {
* @param forcePreload Force preload flag.
* @param caches Rebalancing of these caches will be finished before this started.
* @param cnt Counter.
+ * @return Rebalancing closure.
*/
- public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
- Collection<String> caches, int cnt);
+ public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+ boolean forcePreload,
+ Collection<String> caches,
+ int cnt);
/**
* @param p Preload predicate.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index eb9e97f..ced0d10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -251,6 +251,7 @@ public class GridDhtPartitionDemander {
/**
* @param name Cache name.
* @param fut Future.
+ * @throws IgniteCheckedException If failed.
*/
private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
if (log.isDebugEnabled())
@@ -283,7 +284,7 @@ public class GridDhtPartitionDemander {
* @param force {@code True} if dummy reassign.
* @param caches Rebalancing of these caches will be finished before this started.
* @param cnt Counter.
- * @throws IgniteCheckedException If failed.
+ * @return Rebalancing closure.
*/
Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force,
final Collection<String> caches, int cnt) {
@@ -293,25 +294,24 @@ public class GridDhtPartitionDemander {
long delay = cctx.config().getRebalanceDelay();
if (delay == 0 || force) {
- assert assigns != null;
-
final RebalanceFuture oldFut = rebalanceFut;
final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt);
if (!oldFut.isInitial())
oldFut.cancel();
- else
+ else {
fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> future) {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
oldFut.onDone(fut.result());
}
});
+ }
rebalanceFut = fut;
if (assigns.isEmpty()) {
- fut.doneIfEmpty();
+ fut.doneIfEmpty(assigns.cancelled());
return null;
}
@@ -357,6 +357,9 @@ public class GridDhtPartitionDemander {
/**
* @param fut Future.
+ * @param assigns Assignments.
+ * @throws IgniteCheckedException If failed.
+ * @return
*/
private boolean requestPartitions(
RebalanceFuture fut,
@@ -370,7 +373,7 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = e.getValue();
- fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+ fut.appendPartitions(node.id(), d.partitions()); //Future preparation.
}
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
@@ -413,7 +416,8 @@ public class GridDhtPartitionDemander {
initD.timeout(cctx.config().getRebalanceTimeout());
synchronized (fut) {
- if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened.
+ if (!fut.isDone())
+ // Future can be already cancelled at this moment and all failovers happened.
// New requests will not be covered by failovers.
cctx.io().sendOrderedMessage(node,
rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
@@ -427,9 +431,12 @@ public class GridDhtPartitionDemander {
}
}
else {
- U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
- ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+ U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
+ ", mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() +
+ ", partitionsCount=" + parts.size() +
+ ", topology=" + fut.topologyVersion() +
+ ", updateSeq=" + fut.updateSeq + "]");
d.timeout(cctx.config().getRebalanceTimeout());
d.workerId(0);//old api support.
@@ -832,9 +839,9 @@ public class GridDhtPartitionDemander {
}
/**
- *
+ * @param cancelled Is cancelled.
*/
- private void doneIfEmpty() {
+ private void doneIfEmpty(boolean cancelled) {
synchronized (this) {
if (isDone())
return;
@@ -845,14 +852,14 @@ public class GridDhtPartitionDemander {
log.debug("Rebalancing is not required [cache=" + cctx.name() +
", topology=" + topVer + "]");
- checkIsDone();
+ checkIsDone(cancelled);
}
}
/**
* Cancels this future.
*
- * @return {@code true}.
+ * @return {@code True}.
*/
@Override public boolean cancel() {
synchronized (this) {
@@ -860,7 +867,7 @@ public class GridDhtPartitionDemander {
return true;
U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
- + ", topology=" + topologyVersion());
+ + ", topology=" + topologyVersion() + ']');
if (!cctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
@@ -1012,7 +1019,7 @@ public class GridDhtPartitionDemander {
preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
if (log.isDebugEnabled())
- log.debug("Completed rebalance future.");
+ log.debug("Completed rebalance future: " + this);
cctx.shared().exchange().scheduleResendPartitions();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 3e3cee3..9a6246f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -324,7 +324,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
exchFut.exchangeId());
- break;
+ assigns.cancelled(true);
+
+ return assigns;
}
// If partition belongs to local node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 3583967..3f82c9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -37,19 +37,36 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
/** Last join order. */
private final AffinityTopologyVersion topVer;
+ /** */
+ private boolean cancelled;
+
/**
* @param exchFut Exchange future.
* @param topVer Last join order.
*/
public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) {
assert exchFut != null;
- assert topVer.topologyVersion() > 0;
+ assert topVer.topologyVersion() > 0 : topVer;
this.exchFut = exchFut;
this.topVer = topVer;
}
/**
+ * @return {@code True} if assignments creation was cancelled.
+ */
+ public boolean cancelled() {
+ return cancelled;
+ }
+
+ /**
+ * @param cancelled {@code True} if assignments creation was cancelled.
+ */
+ public void cancelled(boolean cancelled) {
+ this.cancelled = cancelled;
+ }
+
+ /**
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture exchangeFuture() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
index 9d6e82f..0b610f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
@@ -35,7 +34,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -51,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -107,7 +106,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
stopAllGrids();
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testManualPreload() throws Exception {
delay = -1;
@@ -184,7 +185,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
checkCache(c2, cnt);
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testDelayedPreload() throws Exception {
delay = PRELOAD_DELAY;
@@ -238,9 +241,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
checkMaps(false, d0, d1, d2);
- assert l1.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+ assert l1.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS);
- assert l2.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+ assert l2.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS);
U.sleep(1000);
@@ -253,7 +256,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
checkCache(c2, cnt);
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testAutomaticPreload() throws Exception {
delay = 0;
preloadMode = CacheRebalanceMode.SYNC;
@@ -284,7 +289,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
checkCache(c2, cnt);
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testAutomaticPreloadWithEmptyCache() throws Exception {
preloadMode = SYNC;
@@ -331,7 +338,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
}
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testManualPreloadSyncMode() throws Exception {
preloadMode = CacheRebalanceMode.SYNC;
delay = -1;
@@ -344,7 +353,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
}
}
- /** @throws Exception If failed. */
+ /**
+ * @throws Exception If failed.
+ */
public void testPreloadManyNodes() throws Exception {
delay = 0;
preloadMode = ASYNC;
@@ -419,9 +430,11 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest {
*
* @param strict Strict check flag.
* @param caches Maps to compare.
+ * @throws Exception If failed.
*/
- private void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches)
- throws IgniteInterruptedCheckedException {
+ @SafeVarargs
+ private final void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>... caches)
+ throws Exception {
if (caches.length < 2)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index a1ea7ad..2890fcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -73,20 +73,20 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
public class DelayableCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
@Override public void sendMessage(final ClusterNode node, final Message msg,
- final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
final Object msg0 = ((GridIoMessage)msg).message();
if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
rs.putIfAbsent(node.id(), new Runnable() {
@Override public void run() {
- DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+ DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
}
});
}
else
try {
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
catch (Exception e) {
U.log(null, e);
@@ -144,9 +144,8 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
awaitPartitionMapExchange();
- for (Runnable r : rs.values()) {
+ for (Runnable r : rs.values())
r.run();
- }
U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
index 7759c70..bcda0da 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -33,9 +33,8 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration iCfg = super.getConfiguration(gridName);
- for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+ for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration())
cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
- }
return iCfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
new file mode 100644
index 0000000..5e4a5c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncCheckDataTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setRebalanceMode(SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDataRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ final int KEYS = 10_000;
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ for (int i = 0; i < KEYS; i++)
+ cache.put(i, i);
+
+
+ for (int i = 0; i < 3; i++) {
+ log.info("Iteration: " + i);
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try(Ignite ignite = startGrid(idx.getAndIncrement())) {
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ for (int i = 0; i < KEYS; i++)
+ assertNotNull(cache.localPeek(i));
+ }
+
+ return null;
+ }
+ }, 5, "start-node");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 8c5cd40..3b25bd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -45,19 +45,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- private static int TEST_SIZE = 100_000;
+ private static final int TEST_SIZE = 100_000;
/** partitioned cache name. */
- protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+ protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP";
/** partitioned cache 2 name. */
- protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+ protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
/** replicated cache name. */
- protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+ protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR";
/** replicated cache 2 name. */
- protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+ protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
/** */
private volatile boolean concurrentStartFinished;
@@ -122,6 +122,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
+ * @param from Start from key.
+ * @param iter Iteration.
*/
protected void generateData(Ignite ignite, int from, int iter) {
generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
@@ -132,6 +134,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
+ * @param name Cache name.
+ * @param from Start from key.
+ * @param iter Iteration.
*/
protected void generateData(Ignite ignite, String name, int from, int iter) {
for (int i = from; i < from + TEST_SIZE; i++) {
@@ -144,9 +149,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
- * @throws IgniteCheckedException Exception.
+ * @param from Start from key.
+ * @param iter Iteration.
*/
- protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException {
+ protected void checkData(Ignite ignite, int from, int iter) {
checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
@@ -155,10 +161,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
+ * @param from Start from key.
+ * @param iter Iteration.
* @param name Cache name.
- * @throws IgniteCheckedException Exception.
*/
- protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException {
+ protected void checkData(Ignite ignite, String name, int from, int iter) {
for (int i = from; i < from + TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
@@ -169,7 +176,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception Exception
+ * @throws Exception If failed.
*/
public void testSimpleRebalancing() throws Exception {
Ignite ignite = startGrid(0);
@@ -206,7 +213,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception Exception
+ * @throws Exception If failed.
*/
public void testLoadRebalancing() throws Exception {
final Ignite ignite = startGrid(0);
@@ -240,14 +247,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
Thread t2 = new Thread() {
@Override public void run() {
- while (!concurrentStartFinished) {
- try {
- checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace();
- }
- }
+ while (!concurrentStartFinished)
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
}
};
@@ -282,7 +283,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
* @param id Node id.
* @param major Major ver.
* @param minor Minor ver.
- * @throws IgniteCheckedException Exception.
+ * @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
@@ -291,7 +292,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param id Node id.
* @param major Major ver.
- * @throws IgniteCheckedException Exception.
+ * @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
waitForRebalancing(id, new AffinityTopologyVersion(major));
@@ -300,7 +301,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param id Node id.
* @param top Topology version.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
boolean finished = false;
@@ -327,6 +328,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
*
*/
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
protected void checkSupplyContextMapIsEmpty() {
for (Ignite g : G.allGrids()) {
for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
@@ -342,12 +344,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
}
+ /** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 5 * 60_000;
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testComplexRebalancing() throws Exception {
final Ignite ignite = startGrid(0);
@@ -368,9 +371,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
startGrid(1);
startGrid(2);
- while (!concurrentStartFinished2) {
+ while (!concurrentStartFinished2)
U.sleep(10);
- }
waitForRebalancing(0, 5, 0);
waitForRebalancing(1, 5, 0);
@@ -387,9 +389,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
grid(0).getOrCreateCache(cacheRCfg);
- while (!concurrentStartFinished3) {
+ while (!concurrentStartFinished3)
U.sleep(10);
- }
concurrentStartFinished = true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 506f1c2..7e35906 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -125,9 +125,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
startGrid(0);
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 100; i++)
grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
- }
readCnt.set(1);
@@ -135,9 +134,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
readCnt.set(Integer.MAX_VALUE);
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 50; i++)
assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
- }
stopGrid(0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index b02d022..176ab3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
@@ -140,6 +141,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingSyncCheckDataTest.class);
suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);