You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/11/11 01:10:57 UTC
[42/50] [abbrv] ignite git commit: Ignite-1093 Logging & Backward
compatibility failover fixes.
Ignite-1093 Logging & Backward compatibility failover fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05a86a2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05a86a2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05a86a2b
Branch: refs/heads/ignite-1527
Commit: 05a86a2bd1f0e0905f24f78590d75131f8fbcef1
Parents: aa488d5
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 10 16:14:15 2015 +0300
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Nov 11 00:09:44 2015 +0000
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 34 +++++++-------------
.../dht/preloader/GridDhtPartitionDemander.java | 25 ++++++++++++--
2 files changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/05a86a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 479a0b6..5b4fee3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -617,13 +617,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @return {@code True} if topology has changed.
- */
- public boolean topologyChanged() {
- return exchWorker.topologyChanged();
- }
-
- /**
* @param exchFut Exchange future.
* @param reassign Dummy reassign flag.
*/
@@ -673,7 +666,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
- Collection<ClusterNode> rmts = null;
+ Collection<ClusterNode> rmts;
// If this is the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -1362,7 +1355,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (marshR != null || !rebalanceQ.isEmpty()) {
if (futQ.isEmpty()) {
- U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]");
+ U.log(log, "Rebalancing required" +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().node().id() + ']');
if (marshR != null)
try {
@@ -1404,13 +1399,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}, /*system pool*/ true);
}
- else {
- U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
- }
- }
- else {
- U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
+ else
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().node().id() + ']');
}
+ else
+ U.log(log, "Skipping rebalancing (nothing scheduled) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().node().id() + ']');
}
}
catch (IgniteInterruptedCheckedException e) {
@@ -1425,13 +1422,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
}
-
- /**
- * @return {@code True} if another exchange future has been queued up.
- */
- boolean topologyChanged() {
- return !futQ.isEmpty() || busy;
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/05a86a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 29ca5f4..40d3dc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -114,6 +114,10 @@ public class GridDhtPartitionDemander {
@Deprecated//Backward compatibility. To be removed in future.
private final AtomicInteger dmIdx = new AtomicInteger();
+ /** DemandWorker. */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private volatile DemandWorker worker;
+
/** Cached rebalance topics. */
private final Map<Integer, Object> rebalanceTopics;
@@ -166,6 +170,11 @@ public class GridDhtPartitionDemander {
rebalanceFut.onDone(false);
}
+ DemandWorker dw = worker;
+
+ if (dw != null)
+ dw.cancel();
+
lastExchangeFut = null;
lastTimeoutObj.set(null);
@@ -426,9 +435,9 @@ public class GridDhtPartitionDemander {
d.timeout(cctx.config().getRebalanceTimeout());
d.workerId(0);//old api support.
- DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+ worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
- dw.run(node, d);
+ worker.run(node, d);
}
}
@@ -1137,6 +1146,13 @@ public class GridDhtPartitionDemander {
return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
}
+ /** */
+ public void cancel() {
+ msgQ.clear();
+
+ msgQ.offer(new SupplyMessage(null, null));
+ }
+
/**
* @param node Node to demand from.
* @param topVer Topology version.
@@ -1159,7 +1175,7 @@ public class GridDhtPartitionDemander {
d.topic(topic(cntr));
d.workerId(id);
- if (topologyChanged(fut))
+ if (fut.isDone() || topologyChanged(fut))
return;
cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@@ -1228,6 +1244,9 @@ public class GridDhtPartitionDemander {
continue; // While.
}
+ if (s.senderId() == null)
+ return; // Stopping now.
+
// Check that message was received from expected node.
if (!s.senderId().equals(node.id())) {
U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +