You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/21 20:27:04 UTC
[2/2] ignite git commit: IGNITE-4242 ExchangeManager should wait for
cache rebalancing in async way
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d54d274
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d54d274
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d54d274
Branch: refs/heads/ignite-4242
Commit: 2d54d274aa804d8e445887fbe8875ed3bc0d4eab
Parents: 0086122
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Nov 21 23:26:49 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Nov 21 23:26:49 2016 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 12 +++--
.../dht/preloader/GridDhtPartitionDemander.java | 51 ++++++++------------
2 files changed, 27 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index dd0256a..75aeeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1569,12 +1569,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (Integer cacheId : orderMap.get(order)) {
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- Runnable cur = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+ GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+
+ Runnable cur = cacheCtx.preloader().addAssignments(assignments,
forcePreload,
cnt,
r);
- if (cur != null)
+ if (cur != null && !assignments.isEmpty())
rebList.add(cacheCtx.name());
rebFuts.add(cacheCtx.preloader().rebalanceFuture());
@@ -1588,9 +1590,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
fut.get();
if (r != null) {
- Collections.reverse(rebList);
+ if (!rebList.isEmpty()) {
+ Collections.reverse(rebList);
- U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
+ U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
+ }
if (futQ.isEmpty()) {
U.log(log, "Rebalancing required " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 41b7bc1..fadec5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -287,25 +287,37 @@ public class GridDhtPartitionDemander {
}
});
+ if (assigns.isEmpty()) {
+ U.log(log, "Rebalancing is not required.");
+
+ ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+ }
+
return new Runnable() {
@Override public void run() {
+ if (assigns.isEmpty()) {
+ rebalanceFut.onDone(true); // Starts next cache preloading (according to order).
+
+ return;
+ }
+
try {
- requestPartitions(fut, assigns);
+ requestPartitions(rebalanceFut, assigns);
}
- catch (ClusterTopologyCheckedException e){
+ catch (ClusterTopologyCheckedException e) {
log.warning("Failed to send initial demand request to node.", e);
- fut.cancel();
+ rebalanceFut.cancel();
}
catch (IgniteCheckedException e) {
log.error("Failed to send initial demand request to node.", e);
- fut.cancel();
+ rebalanceFut.cancel();
}
catch (Throwable th) {
log.error("Runtime error caught during initial demand request sending.", th);
- fut.cancel();
+ rebalanceFut.cancel();
if (th instanceof Error)
throw th;
@@ -351,12 +363,6 @@ public class GridDhtPartitionDemander {
RebalanceFuture fut,
GridDhtPreloaderAssignments assigns
) throws IgniteCheckedException {
- if (assigns.isEmpty()) {
- fut.doneIfEmpty(assigns.cancelled());
-
- return;
- }
-
if (topologyChanged(fut)) {
fut.cancel();
@@ -382,7 +388,7 @@ public class GridDhtPartitionDemander {
//Check remote node rebalancing API version.
if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
- U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
@@ -841,24 +847,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @param cancelled Is cancelled.
- */
- private void doneIfEmpty(boolean cancelled) {
- synchronized (this) {
- if (isDone())
- return;
-
- assert remaining.isEmpty();
-
- if (log.isDebugEnabled())
- log.debug("Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + topVer + "]");
-
- checkIsDone(cancelled, true);
- }
- }
-
- /**
* Cancels this future.
*
* @return {@code True}.
@@ -981,8 +969,7 @@ public class GridDhtPartitionDemander {
if (parts.isEmpty()) {
U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
- "rebalancing [cache=" + cctx.name() +
- ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+ "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
remaining.remove(nodeId);