You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/25 13:24:47 UTC
ignite git commit: ignite-1093
Repository: ignite
Updated Branches:
refs/heads/ignite-1093 96e9a97eb -> 933fd6fdd
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/933fd6fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/933fd6fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/933fd6fd
Branch: refs/heads/ignite-1093
Commit: 933fd6fdd2ea0807dceaf158fd2fe81c89245923
Parents: 96e9a97
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 25 14:24:37 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 25 14:24:37 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 263 +++++++++++--------
...GridCacheMassiveRebalancingSyncSelfTest.java | 2 +-
2 files changed, 154 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/933fd6fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9d244ef..a5f8226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
import org.jsr166.*;
import java.util.*;
@@ -91,50 +92,6 @@ public class GridDhtPartitionDemander {
*
*/
void start() {
- int rebalanceOrder = cctx.config().getRebalanceOrder();
-
- if (!CU.isMarshallerCache(cctx.name())) {
- if (log.isDebugEnabled())
- log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
- try {
- cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
- }
- }
-
- if (rebalanceOrder > 0) {
- IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
- try {
- if (fut != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
- ", rebalanceOrder=" + rebalanceOrder + ']');
-
- fut.get();
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
- }
- }
}
/**
@@ -227,92 +184,94 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
assert assigns != null;
- AffinityTopologyVersion topVer = assigns.topologyVersion();
+ final AffinityTopologyVersion topVer = assigns.topologyVersion();
- if (syncFut.isInited()) {
- if (!syncFut.isDone())
- syncFut.onCancel();
+ SyncFuture curSyncFut = syncFut;
- syncFut = new SyncFuture(assigns);
+ if (curSyncFut.isInited()) {
+ if (!curSyncFut.isDone())
+ curSyncFut.onCancel();
+
+ curSyncFut = new SyncFuture(assigns);
+
+ syncFut = curSyncFut;
}
else
- syncFut.init(assigns);
+ curSyncFut.init(assigns);
if (assigns.isEmpty() || topologyChanged(topVer)) {
- syncFut.onCancel();
+ curSyncFut.onCancel();
return;
}
- for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
- GridDhtPartitionDemandMessage d = e.getValue();
-
- d.timeout(cctx.config().getRebalanceTimeout());
- d.workerId(0);//old api support.
+ final SyncFuture cSF = curSyncFut;
- final ClusterNode node = e.getKey();
-
- final long start = U.currentTimeMillis();
-
- final CacheConfiguration cfg = cctx.config();
+ new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
+ @Override public void run() {
+ if (!CU.isMarshallerCache(cctx.name())) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
- final AffinityTopologyVersion top = d.topologyVersion();
+ try {
+ IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+ if (!topologyChanged(topVer))
+ fut.get();
+ else {
+ cSF.onCancel();
- syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> t) {
- Boolean cancelled = ((SyncFuture)t).cancelled();
- U.log(log, (cancelled ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
- + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
- ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ return;
+ }
}
- });
- }
-
- GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
- remainings.addAll(d.partitions());
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ']');
- syncFut.append(node.id(), remainings);
-
- int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
-
- List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
-
- for (int cnt = 0; cnt < lsnrCnt; cnt++)
- sParts.add(new HashSet<Integer>());
-
- Iterator<Integer> it = d.partitions().iterator();
-
- int cnt = 0;
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+ }
+ }
- while (it.hasNext())
- sParts.get(cnt++ % lsnrCnt).add(it.next());
+ int rebalanceOrder = cctx.config().getRebalanceOrder();
- for (cnt = 0; cnt < lsnrCnt; cnt++) {
+ if (rebalanceOrder > 0) {
+ IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
- if (!sParts.get(cnt).isEmpty()) {
+ try {
+ if (fut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+ ", rebalanceOrder=" + rebalanceOrder + ']');
- // Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+ if (!topologyChanged(topVer))
+ fut.get();
+ else {
+ cSF.onCancel();
- initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+ return;
+ }
+ }
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
- try {
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+ return;
}
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to send partition demand message to local node", ex);
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
}
-
- if (log.isDebugEnabled())
- log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
}
+
+ requestPartitions(assigns, topVer, cSF);
}
- }
+ }).start();
+
}
else if (delay > 0) {
GridTimeoutObject obj = lastTimeoutObj.get();
@@ -341,6 +300,90 @@ public class GridDhtPartitionDemander {
}
/**
+ * @param assigns Assigns.
+ */
+ private void requestPartitions(
+ final GridDhtPreloaderAssignments assigns,
+ AffinityTopologyVersion topVer,
+ SyncFuture fut) {
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ GridDhtPartitionDemandMessage d = e.getValue();
+
+ d.timeout(cctx.config().getRebalanceTimeout());
+ d.workerId(0);//old api support.
+
+ final ClusterNode node = e.getKey();
+
+ final long start = U.currentTimeMillis();
+
+ final CacheConfiguration cfg = cctx.config();
+
+ final AffinityTopologyVersion top = d.topologyVersion();
+
+ if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+ syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> t) {
+ Boolean completed = ((SyncFuture)t).isCompleted();
+ U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
+ + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
+ ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ }
+ });
+ }
+
+ GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+
+ remainings.addAll(d.partitions());
+
+ syncFut.append(node.id(), remainings);
+
+ int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
+
+ Iterator<Integer> it = d.partitions().iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
+
+ if (!sParts.get(cnt).isEmpty()) {
+
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+ initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+
+ try {
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to send partition demand message to local node", ex);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
+ }
+ }
+ }
+ }
+
+ /**
* @param c Partitions.
* @return String representation of partitions list.
*/
@@ -647,7 +690,7 @@ public class GridDhtPartitionDemander {
/** Assignments. */
private volatile GridDhtPreloaderAssignments assigns;
- private volatile boolean cancelled = false;
+ private volatile boolean completed = true;
SyncFuture(GridDhtPreloaderAssignments assigns) {
this.assigns = assigns;
@@ -691,7 +734,7 @@ public class GridDhtPartitionDemander {
void onCancel() {
remaining.clear();
- cancelled = true;
+ completed = false;
checkIsDone();
}
@@ -702,13 +745,13 @@ public class GridDhtPartitionDemander {
remaining.remove(nodeId);
- cancelled = true;
+ completed = false;
checkIsDone();
}
- boolean cancelled() {
- return cancelled;
+ boolean isCompleted() {
+ return completed;
}
void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
@@ -774,7 +817,7 @@ public class GridDhtPartitionDemander {
if (lsnr != null)
cctx.events().removeListener(lsnr);
- onDone(cancelled);
+ onDone(completed);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/933fd6fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index b0fc011..f69b710 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,7 +39,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
/** */
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- private static int TEST_SIZE = 5_000_000;
+ private static int TEST_SIZE = 1_000_000;
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";