You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:40 UTC
[13/16] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93caa0b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93caa0b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93caa0b8
Branch: refs/heads/ignite-1093-2
Commit: 93caa0b8bb9eb4f5f667d0533c15ce7a9efcdc17
Parents: 2cb397a
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 21:21:17 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 21:21:17 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 11 ++-
.../dht/preloader/GridDhtPartitionSupplier.java | 99 +++++++++-----------
2 files changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 596ec2f..a2f8c01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -827,7 +827,7 @@ public class GridDhtPartitionDemander {
lock.lock();
try {
- remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts));
+ remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts));
}
finally {
lock.unlock();
@@ -949,7 +949,8 @@ public class GridDhtPartitionDemander {
parts.remove(p);
if (parts.isEmpty()) {
- U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+ U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+ "rebalancing [cache=" + cctx.name() +
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
@@ -988,6 +989,9 @@ public class GridDhtPartitionDemander {
*/
private void checkIsDone() {
if (remaining.isEmpty()) {
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
if (log.isDebugEnabled())
log.debug("Completed sync future.");
@@ -1006,9 +1010,6 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
onDone();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 50d64f9..1d8572a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,12 +40,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T4;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -72,7 +70,7 @@ class GridDhtPartitionSupplier {
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
/** Supply context map. */
- private final ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<T4, SupplyContext> scMap = new ConcurrentHashMap8<>();
/** Rebalancing listener. */
private GridLocalEventListener lsnr;
@@ -98,19 +96,8 @@ class GridDhtPartitionSupplier {
void start() {
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- if (evt instanceof CacheRebalancingEvent) {
- ClusterNode node = ((CacheRebalancingEvent)evt).discoveryNode();
-
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
- for (int idx = 0; idx < lsnrCnt; idx++) {
- T2<UUID, Integer> scId = new T2<>(node.id(), idx);
-
- tryClearContext(scMap, scId, log);
- }
- }
- else if (evt instanceof DiscoveryEvent) {
- scMap.clear();
+ if (evt instanceof DiscoveryEvent) {
+ clearContexts(scMap, log, cctx);
}
else {
assert false;
@@ -118,7 +105,9 @@ class GridDhtPartitionSupplier {
}
};
- cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+ //todo: rebalance stopped.
+
+ cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
startOldListeners();
}
@@ -133,19 +122,33 @@ class GridDhtPartitionSupplier {
}
/**
- * Clear context by id.
+ * Clear contexts.
*
* @param map Context map.
- * @param scId Context id.
* @param log Logger.
+ * @param cctx Context.
*/
- private static void tryClearContext(
- ConcurrentHashMap8<T2, SupplyContext> map,
- T2<UUID, Integer> scId,
- IgniteLogger log) {
- SupplyContext sc = map.get(scId);
+ private static void clearContexts(
+ ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) {
+ for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) {
+ clearContext(map, entry.getKey(), entry.getValue(), log, cctx);
+ }
+ }
- if (sc != null) {
+ /**
+ * Clear context.
+ *
+ * @param map Context map.
+ * @param log Logger.
+ */
+ private static boolean clearContext(
+ ConcurrentHashMap8<T4, SupplyContext> map,
+ T4 t,
+ SupplyContext sc,
+ IgniteLogger log,
+ GridCacheContext<?, ?> cctx) {
+
+ if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) {
Iterator it = sc.entryIt;
if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
@@ -159,9 +162,11 @@ class GridDhtPartitionSupplier {
log.error("Iterator close failed.", e);
}
}
+
+ return map.remove(t, sc);
}
- map.remove(scId, sc);
+ return false;
}
/**
@@ -190,21 +195,16 @@ class GridDhtPartitionSupplier {
ClusterNode node = cctx.discovery().node(id);
- T2<UUID, Integer> scId = new T2<>(id, idx);
+ T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence());
try {
SupplyContext sctx = scMap.get(scId);
- if (sctx == null) {
- if (d.partitions().isEmpty())
- return;
- }
- else {
- if (!sctx.top.equals(d.topologyVersion())) {
- tryClearContext(scMap, scId, log);
+ if (sctx == null && d.partitions().isEmpty())
+ return;
- sctx = scMap.get(scId);
- }
+ if (sctx != null && !d.partitions().isEmpty()) {
+ assert false;
}
long bCnt = 0;
@@ -277,7 +277,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
swapLsnr = null;
@@ -321,8 +321,7 @@ class GridDhtPartitionSupplier {
partIt,
null,
swapLsnr,
- part,
- d.topologyVersion());
+ part);
}
}
@@ -353,7 +352,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
swapLsnr = null;
@@ -436,8 +435,7 @@ class GridDhtPartitionSupplier {
partIt,
null,
null,
- part,
- d.topologyVersion());
+ part);
}
}
@@ -465,7 +463,7 @@ class GridDhtPartitionSupplier {
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr, d.topologyVersion());
+ saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
swapLsnr = null;
@@ -557,13 +555,12 @@ class GridDhtPartitionSupplier {
* @param swapLsnr Swap listener.
*/
private void saveSupplyContext(
- T2 t,
+ T4 t,
int phase,
Iterator<Integer> partIt,
int part,
- Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
- AffinityTopologyVersion top) {
- scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part, top));
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+ scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
}
/**
@@ -585,9 +582,6 @@ class GridDhtPartitionSupplier {
/** Partition. */
private final int part;
- /** Topology version. */
- private final AffinityTopologyVersion top;
-
/**
* @param phase Phase.
* @param partIt Partition iterator.
@@ -596,13 +590,12 @@ class GridDhtPartitionSupplier {
* @param part Partition.
*/
public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
- GridCacheEntryInfoCollectSwapListener swapLsnr, int part, AffinityTopologyVersion top) {
+ GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
this.phase = phase;
this.partIt = partIt;
this.entryIt = entryIt;
this.swapLsnr = swapLsnr;
this.part = part;
- this.top = top;
}
}