You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 11:27:25 UTC
[03/25] ignite git commit: IGNITE-4242 ExchangeManager should wait
for cache rebalancing in async way
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ba1711a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ba1711a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ba1711a
Branch: refs/heads/ignite-4371
Commit: 6ba1711a1fa10d8276974227491136070c3ed43a
Parents: acf20b3
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Dec 6 12:55:41 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Dec 6 12:55:41 2016 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 128 +++--------
.../processors/cache/GridCachePreloader.java | 11 +-
.../cache/GridCachePreloaderAdapter.java | 5 +-
.../dht/preloader/GridDhtPartitionDemander.java | 230 +++++++++++--------
.../dht/preloader/GridDhtPreloader.java | 9 +-
.../GridCacheRebalancingSyncSelfTest.java | 2 +
6 files changed, 183 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7a24aa1..f04a6ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,18 +21,18 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Queue;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -87,7 +87,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -97,13 +96,11 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -156,9 +153,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private GridFutureAdapter<?> reconnectExchangeFut;
- /** */
- private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();
-
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -1596,12 +1590,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long timeout = cctx.gridConfig().getNetworkTimeout();
- boolean startEvtFired = false;
-
int cnt = 0;
- IgniteInternalFuture asyncStartFut = null;
-
while (!isCancelled()) {
GridDhtPartitionsExchangeFuture exchFut = null;
@@ -1703,20 +1693,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
continue;
changed |= cacheCtx.topology().afterExchange(exchFut);
-
- // Preload event notification.
- if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
- if (!cacheCtx.isReplicated() || !startEvtFired) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cacheCtx.events().addPreloadEvent(-1, EVT_CACHE_REBALANCE_STARTED,
- discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
- }
- }
}
- startEvtFired = true;
-
if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
refreshPartitions();
}
@@ -1755,8 +1733,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (assignsMap != null) {
int size = assignsMap.size();
- rebalanceQ.clear();
-
NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
@@ -1772,101 +1748,65 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
orderMap.get(order).add(cacheId);
}
- Callable<Boolean> marshR = null;
- List<Callable<Boolean>> orderedRs = new ArrayList<>(size);
+ Runnable r = null;
+
+ List<String> rebList = new LinkedList<>();
+
+ boolean assignsCancelled = false;
- //Ordered rebalance scheduling.
- for (Integer order : orderMap.keySet()) {
+ for (Integer order : orderMap.descendingKeySet()) {
for (Integer cacheId : orderMap.get(order)) {
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- List<String> waitList = new ArrayList<>(size - 1);
+ GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
- for (List<Integer> cIds : orderMap.headMap(order).values()) {
- for (Integer cId : cIds)
- waitList.add(cctx.cacheContext(cId).name());
- }
+ if (assigns != null)
+ assignsCancelled |= assigns.cancelled();
- Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+ // Cancels previous rebalance future (in case it's not done yet).
+ // Sends previous rebalance stopped event (if necessary).
+ // Creates new rebalance future.
+ // Sends current rebalance started event (if necessary).
+ // Finishes cache sync future (on empty assignments).
+ Runnable cur = cacheCtx.preloader().addAssignments(assigns,
forcePreload,
- waitList,
- cnt);
+ cnt,
+ r);
- if (r != null) {
- U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
- ", waitList=" + waitList.toString() + "]");
+ if (cur != null) {
+ rebList.add(U.maskName(cacheCtx.name()));
- if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
- marshR = r;
- else
- orderedRs.add(r);
+ r = cur;
}
}
}
- if (asyncStartFut != null)
- asyncStartFut.get(); // Wait for thread stop.
+ if (assignsCancelled) { // Pending exchange.
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ else if (r != null) {
+ Collections.reverse(rebList);
- rebalanceQ.addAll(orderedRs);
+ U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
- if (marshR != null || !rebalanceQ.isEmpty()) {
if (futQ.isEmpty()) {
- U.log(log, "Rebalancing required " +
+ U.log(log, "Rebalancing started " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- if (marshR != null) {
- try {
- marshR.call(); //Marshaller cache rebalancing launches in sync way.
- }
- catch (Exception ex) {
- if (log.isDebugEnabled())
- log.debug("Failed to send initial demand request to node");
-
- continue;
- }
- }
-
- final GridFutureAdapter fut = new GridFutureAdapter();
-
- asyncStartFut = fut;
-
- cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
- @Override public Boolean call() {
- try {
- while (true) {
- Callable<Boolean> r = rebalanceQ.poll();
-
- if (r == null)
- return false;
-
- if (!r.call())
- return false;
- }
- }
- catch (Exception ex) {
- if (log.isDebugEnabled())
- log.debug("Failed to send initial demand request to node");
-
- return false;
- }
- finally {
- fut.onDone();
- }
- }
- }, /*system pool*/true);
+ r.run(); // Starts rebalancing routine.
}
- else {
+ else
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
}
- else {
+ else
U.log(log, "Skipping rebalancing (nothing scheduled) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
}
}
catch (IgniteInterruptedCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1d1cfab..3c4456d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.UUID;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -84,14 +83,14 @@ public interface GridCachePreloader {
*
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
- * @param caches Rebalancing of these caches will be finished before this started.
* @param cnt Counter.
- * @return Rebalancing closure.
+ * @param next Runnable responsible for cache rebalancing start.
+ * @return Rebalancing runnable.
*/
- public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+ public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
- Collection<String> caches,
- int cnt);
+ int cnt,
+ Runnable next);
/**
* @param p Preload predicate.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b15ebc5..656a960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.UUID;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -166,8 +165,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
- Collection<String> caches, int cnt) {
+ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+ int cnt, Runnable next) {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 57d5229..a6808c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -28,8 +28,8 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
@@ -72,6 +72,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@ -122,6 +123,18 @@ public class GridDhtPartitionDemander {
private final Map<Integer, Object> rebalanceTopics;
/**
+ * Started event sent.
+ * Make sense for replicated cache only.
+ */
+ private final AtomicBoolean startedEvtSent = new AtomicBoolean();
+
+ /**
+ * Stopped event sent.
+ * Make sense for replicated cache only.
+ */
+ private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
+
+ /**
* @param cctx Cctx.
* @param demandLock Demand lock.
*/
@@ -249,45 +262,25 @@ public class GridDhtPartitionDemander {
}
/**
- * @param name Cache name.
- * @param fut Future.
- * @throws IgniteCheckedException If failed.
+ * Sets last exchange future.
+ *
+ * @param lastFut Last future to set.
*/
- private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name() +
- ", waitCache=" + name + ']');
-
- RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name)
- .preloader().rebalanceFuture();
-
- if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
- if (!wFut.get()) {
- U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
- "] (cache rebalanced with missed partitions)");
-
- return false;
- }
-
- return true;
- }
- else {
- U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
- "] (topology already changed)");
-
- return false;
- }
+ void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
+ lastExchangeFut = lastFut;
}
/**
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
- * @param caches Rebalancing of these caches will be finished before this started.
* @param cnt Counter.
- * @return Rebalancing closure.
+ * @param next Runnable responsible for cache rebalancing start.
+ * @return Rebalancing runnable.
*/
- Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force,
- final Collection<String> caches, int cnt) {
+ Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
+ boolean force,
+ int cnt,
+ final Runnable next) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -296,7 +289,7 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
final RebalanceFuture oldFut = rebalanceFut;
- final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt);
+ final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
if (!oldFut.isInitial())
oldFut.cancel();
@@ -310,20 +303,69 @@ public class GridDhtPartitionDemander {
rebalanceFut = fut;
- if (assigns.isEmpty()) {
- fut.doneIfEmpty(assigns.cancelled());
+ fut.sendRebalanceStartedEvent();
+
+ if (assigns.cancelled()) { // Pending exchange.
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing skipped due to cancelled assignments.");
+
+ fut.onDone(false);
+
+ fut.sendRebalanceFinishedEvent();
+
+ return null;
+ }
+
+ if (assigns.isEmpty()) { // Nothing to rebalance.
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing skipped due to empty assignments.");
+
+ fut.onDone(true);
+
+ ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
+ fut.sendRebalanceFinishedEvent();
return null;
}
- return new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- for (String c : caches) {
- if (!waitForCacheRebalancing(c, fut))
- return false;
+ return new Runnable() {
+ @Override public void run() {
+ try {
+ if (next != null)
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ try {
+ if (f.get()) // Not cancelled.
+ next.run(); // Starts next cache rebalancing (according to the order).
+ }
+ catch (IgniteCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug(ignored.getMessage());
+ }
+ }
+ });
+
+ requestPartitions(fut, assigns);
}
+ catch (IgniteCheckedException e) {
+ ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
- return requestPartitions(fut, assigns);
+ if (cause != null)
+ log.warning("Failed to send initial demand request to node. " + e.getMessage());
+ else
+ log.error("Failed to send initial demand request to node.", e);
+
+ fut.cancel();
+ }
+ catch (Throwable th) {
+ log.error("Runtime error caught during initial demand request sending.", th);
+
+ fut.cancel();
+
+ if (th instanceof Error)
+ throw th;
+ }
}
};
}
@@ -361,14 +403,17 @@ public class GridDhtPartitionDemander {
* @throws IgniteCheckedException If failed.
* @return Partitions were requested.
*/
- private boolean requestPartitions(
+ private void requestPartitions(
RebalanceFuture fut,
GridDhtPreloaderAssignments assigns
) throws IgniteCheckedException {
- for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
- if (topologyChanged(fut))
- return false;
+ if (topologyChanged(fut)) {
+ fut.cancel();
+
+ return;
+ }
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
final ClusterNode node = e.getKey();
GridDhtPartitionDemandMessage d = e.getValue();
@@ -387,7 +432,7 @@ public class GridDhtPartitionDemander {
//Check remote node rebalancing API version.
if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
- U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
@@ -446,8 +491,6 @@ public class GridDhtPartitionDemander {
worker.run(node, d);
}
}
-
- return true;
}
/**
@@ -739,23 +782,17 @@ public class GridDhtPartitionDemander {
}
/**
- * Sets last exchange future.
- *
- * @param lastFut Last future to set.
- */
- void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- lastExchangeFut = lastFut;
- }
-
- /**
*
*/
public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
/** */
private static final long serialVersionUID = 1L;
- /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
- private final boolean sndStoppedEvnt;
+ /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
+ private final AtomicBoolean startedEvtSent;
+
+ /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
+ private final AtomicBoolean stoppedEvtSent;
/** */
private final GridCacheContext<?, ?> cctx;
@@ -783,13 +820,15 @@ public class GridDhtPartitionDemander {
* @param assigns Assigns.
* @param cctx Context.
* @param log Logger.
- * @param sentStopEvnt Stop event flag.
+ * @param startedEvtSent Start event sent flag.
+ * @param stoppedEvtSent Stop event sent flag.
* @param updateSeq Update sequence.
*/
RebalanceFuture(GridDhtPreloaderAssignments assigns,
GridCacheContext<?, ?> cctx,
IgniteLogger log,
- boolean sentStopEvnt,
+ AtomicBoolean startedEvtSent,
+ AtomicBoolean stoppedEvtSent,
long updateSeq) {
assert assigns != null;
@@ -797,7 +836,8 @@ public class GridDhtPartitionDemander {
this.topVer = assigns.topologyVersion();
this.cctx = cctx;
this.log = log;
- this.sndStoppedEvnt = sentStopEvnt;
+ this.startedEvtSent = startedEvtSent;
+ this.stoppedEvtSent = stoppedEvtSent;
this.updateSeq = updateSeq;
}
@@ -809,7 +849,8 @@ public class GridDhtPartitionDemander {
this.topVer = null;
this.cctx = null;
this.log = null;
- this.sndStoppedEvnt = false;
+ this.startedEvtSent = null;
+ this.stoppedEvtSent = null;
this.updateSeq = -1;
}
@@ -848,24 +889,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @param cancelled Is cancelled.
- */
- private void doneIfEmpty(boolean cancelled) {
- synchronized (this) {
- if (isDone())
- return;
-
- assert remaining.isEmpty();
-
- if (log.isDebugEnabled())
- log.debug("Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + topVer + "]");
-
- checkIsDone(cancelled, true);
- }
- }
-
- /**
* Cancels this future.
*
* @return {@code True}.
@@ -875,8 +898,7 @@ public class GridDhtPartitionDemander {
if (isDone())
return true;
- U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
- + ", topology=" + topologyVersion() + ']');
+ U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
if (!cctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
@@ -885,7 +907,7 @@ public class GridDhtPartitionDemander {
remaining.clear();
- checkIsDone(true /* cancelled */, false);
+ checkIsDone(true /* cancelled */);
}
return true;
@@ -907,7 +929,7 @@ public class GridDhtPartitionDemander {
remaining.remove(nodeId);
- onDone(false); // Finishing rebalance future a non completed.
+ onDone(false); // Finishing rebalance future as non completed.
checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
}
@@ -988,8 +1010,7 @@ public class GridDhtPartitionDemander {
if (parts.isEmpty()) {
U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
- "rebalancing [cache=" + cctx.name() +
- ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+ "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
remaining.remove(nodeId);
@@ -1022,23 +1043,20 @@ public class GridDhtPartitionDemander {
*
*/
private void checkIsDone() {
- checkIsDone(false, false);
+ checkIsDone(false);
}
/**
* @param cancelled Is cancelled.
- * @param wasEmpty {@code True} if future was created without assignments.
*/
- private void checkIsDone(boolean cancelled, boolean wasEmpty) {
+ private void checkIsDone(boolean cancelled) {
if (remaining.isEmpty()) {
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sndStoppedEvnt))
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+ sendRebalanceFinishedEvent();
if (log.isDebugEnabled())
log.debug("Completed rebalance future: " + this);
- if (!wasEmpty)
- cctx.shared().exchange().scheduleResendPartitions();
+ cctx.shared().exchange().scheduleResendPartitions();
Collection<Integer> m = new HashSet<>();
@@ -1064,6 +1082,30 @@ public class GridDhtPartitionDemander {
}
}
+ /**
+ *
+ */
+ private void sendRebalanceStartedEvent() {
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
+ (!cctx.isReplicated() || !startedEvtSent.get())) {
+ preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+
+ startedEvtSent.set(true);
+ }
+ }
+
+ /**
+ *
+ */
+ private void sendRebalanceFinishedEvent() {
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+ (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+ stoppedEvtSent.set(true);
+ }
+ }
+
/** {@inheritDoc} */
public String toString() {
return S.toString(RebalanceFuture.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0865d9f..692e7c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
@@ -255,7 +254,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
@Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
supplier.onTopologyChanged(lastFut.topologyVersion());
- demander.updateLastExchangeFuture(lastFut);
+ demander.onTopologyChanged(lastFut);
}
/** {@inheritDoc} */
@@ -413,9 +412,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
- boolean forcePreload, Collection<String> caches, int cnt) {
- return demander.addAssignments(assignments, forcePreload, caches, cnt);
+ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+ boolean forcePreload, int cnt, Runnable next) {
+ return demander.addAssignments(assignments, forcePreload, cnt, next);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6ba1711a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index de38952..3dfcd85 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -501,6 +501,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
record = true;
+ log.info("Checking GridDhtPartitions*Message absent (it will take 30 SECONDS) ... ");
+
U.sleep(30_000);
record = false;