You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/14 18:38:24 UTC
[30/34] incubator-ignite git commit: ignite-1093
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f36482b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f36482b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f36482b
Branch: refs/heads/ignite-1093
Commit: 8f36482b33d05d20a065d2b3684d82ab7559b902
Parents: 50e188d
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Wed Aug 12 12:42:48 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Wed Aug 12 12:42:48 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 168 ++++++++-----------
.../dht/preloader/GridDhtPreloader.java | 19 +--
.../GridCacheMassiveRebalancingSelfTest.java | 19 ++-
3 files changed, 90 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index f6a33c3..6c95707 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -305,6 +306,22 @@ public class GridDhtPartitionDemander {
ClusterNode node = e.getKey();
+ final long start = U.currentTimeMillis();
+
+ final CacheConfiguration cfg = cctx.config();
+
+ if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]");
+
+ syncFut.listen(new CI1<Object>() {
+ @Override public void apply(Object t) {
+ U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode="
+ + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ }
+ });
+ }
+
GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
remainings.addAll(d.partitions());
@@ -342,50 +359,6 @@ public class GridDhtPartitionDemander {
}
}
}
-
- if (log.isInfoEnabled() && !d.partitions().isEmpty()) {
- LinkedList<Integer> s = new LinkedList<>(d.partitions());
-
- Collections.sort(s);
-
- StringBuilder sb = new StringBuilder();
-
- int start = -1;
-
- int prev = -1;
-
- Iterator<Integer> sit = s.iterator();
-
- while (sit.hasNext()) {
- int p = sit.next();
- if (start == -1) {
- start = p;
- prev = p;
- }
-
- if (prev < p - 1) {
- sb.append(start);
-
- if (start != prev)
- sb.append("-").append(prev);
-
- sb.append(", ");
-
- start = p;
- }
-
- if (!sit.hasNext()) {
- sb.append(start);
-
- if (start != p)
- sb.append("-").append(p);
- }
-
- prev = p;
- }
-
- log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]");
- }
}
}
else if (delay > 0) {
@@ -659,82 +632,83 @@ public class GridDhtPartitionDemander {
void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
lastExchangeFut = lastFut;
}
-/**
- *
- */
-private class SyncFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 1L;
-
- private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
- private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
-
- public void append(UUID nodeId, Collection<Integer> parts) {
- remaining.put(nodeId, parts);
-
- missed.put(nodeId, new GridConcurrentHashSet<Integer>());
- }
+ /**
+ *
+ */
+ private class SyncFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 1L;
- void cancel(UUID nodeId) {
- if (isDone())
- return;
+ private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
- remaining.remove(nodeId);
+ private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
- checkIsDone();
- }
+ public void append(UUID nodeId, Collection<Integer> parts) {
+ remaining.put(nodeId, parts);
- void onMissedPartition(UUID nodeId, int p) {
- if (missed.get(nodeId) == null)
missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ }
- missed.get(nodeId).add(p);
- }
-
- void onPartitionDone(UUID nodeId, int p) {
- if (isDone())
- return;
+ void cancel(UUID nodeId) {
+ if (isDone())
+ return;
- Collection<Integer> parts = remaining.get(nodeId);
+ remaining.remove(nodeId);
- parts.remove(p);
+ checkIsDone();
+ }
- if (parts.isEmpty()) {
- remaining.remove(nodeId);
+ void onMissedPartition(UUID nodeId, int p) {
+ if (missed.get(nodeId) == null)
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
+ missed.get(nodeId).add(p);
}
- checkIsDone();
- }
+ void onPartitionDone(UUID nodeId, int p) {
+ if (isDone())
+ return;
- private void checkIsDone() {
- if (remaining.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completed sync future.");
+ Collection<Integer> parts = remaining.get(nodeId);
+
+ parts.remove(p);
- Collection<Integer> m = new HashSet<>();
+ if (parts.isEmpty()) {
+ remaining.remove(nodeId);
- for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
- if (e.getValue() != null && !e.getValue().isEmpty())
- m.addAll(e.getValue());
+ if (log.isDebugEnabled())
+ log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
}
- if (!m.isEmpty()) {
+ checkIsDone();
+ }
+
+ private void checkIsDone() {
+ if (remaining.isEmpty()) {
if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + m);
+ log.debug("Completed sync future.");
- cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
- }
+ Collection<Integer> m = new HashSet<>();
- missed.clear();
+ for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+ if (e.getValue() != null && !e.getValue().isEmpty())
+ m.addAll(e.getValue());
+ }
+
+ if (!m.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Reassigning partitions that were missed: " + m);
+
+ cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+ }
+
+ missed.clear();
- cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+ cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
- onDone();
+ onDone();
+ }
}
}
}
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d994a19..7f99ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -220,24 +219,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
- if (err == null) {
+ if (err == null)
startFut.onDone();
-
- final long start = U.currentTimeMillis();
-
- final CacheConfiguration cfg = cctx.config();
-
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
-
- demander.syncFuture().listen(new CI1<Object>() {
- @Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
- "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
- }
- });
- }
- }
else
startFut.onDone(err);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 5148753..ca95905 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -113,12 +115,27 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
long spend = (System.currentTimeMillis() - start) / 1000;
+ IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
stopGrid(0);
- Thread.sleep(20000);
+ while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
+ f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ U.sleep(100);
+
+ ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+ ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+ f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
stopGrid(1);
+ while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ U.sleep(100);
+
+ ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
checkData(grid(2));
log.info("Spend " + spend + " seconds to preload entries.");