You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:38 UTC
[11/16] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e0eafed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e0eafed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e0eafed
Branch: refs/heads/ignite-1093-2
Commit: 9e0eafed477a3e09e4527f436be3de57d59c9848
Parents: 8c1aa26
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 16:52:38 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 16:52:38 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 126 +++++++++----------
.../dht/preloader/GridDhtPartitionSupplier.java | 8 +-
.../GridDhtPartitionSupplyMessageV2.java | 70 +++++------
3 files changed, 89 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 498b16d..596ec2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
@@ -189,11 +190,11 @@ public class GridDhtPartitionDemander {
}
/**
- * @param topVer Topology version.
+ * @param fut Future.
* @return {@code True} if topology changed.
*/
- private boolean topologyChanged(AffinityTopologyVersion topVer) {
- return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+ private boolean topologyChanged(SyncFuture fut) {
+ return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut;
}
/**
@@ -218,7 +219,7 @@ public class GridDhtPartitionDemander {
try {
SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
- if (!topologyChanged(fut.assigns.topologyVersion()))
+ if (!topologyChanged(fut))
wFut.get();
else {
fut.cancel();
@@ -257,8 +258,6 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
- syncFut = fut;
-
if (!oldFut.isDummy())
oldFut.cancel();
else
@@ -268,10 +267,15 @@ public class GridDhtPartitionDemander {
}
});
- if (fut.doneIfEmpty())// Done in case empty assigns.
+ syncFut = fut;
+
+ if (assigns.isEmpty()) {
+ fut.doneIfEmpty();
+
return;
+ }
- if (topologyChanged(fut.topologyVersion())) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -298,7 +302,7 @@ public class GridDhtPartitionDemander {
log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
", rebalanceOrder=" + rebalanceOrder + ']');
- if (!topologyChanged(fut.topologyVersion()))
+ if (!topologyChanged(fut))
oFut.get();
else {
fut.cancel();
@@ -323,7 +327,7 @@ public class GridDhtPartitionDemander {
}
}
- requestPartitions(fut);
+ requestPartitions(fut, assigns);
}
});
@@ -358,13 +362,9 @@ public class GridDhtPartitionDemander {
/**
* @param fut Future.
*/
- private void requestPartitions(SyncFuture fut) {
- final GridDhtPreloaderAssignments assigns = fut.assigns;
-
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
+ private void requestPartitions(SyncFuture fut, GridDhtPreloaderAssignments assigns) {
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -411,7 +411,7 @@ public class GridDhtPartitionDemander {
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
try {
- if (!topologyChanged(topVer))
+ if (!topologyChanged(fut))
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
else
fut.cancel();
@@ -498,16 +498,12 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = syncFut;
- if (!fut.topologyVersion().equals(topVer))//will check topology changed at loop.
- return;
-
ClusterNode node = cctx.node(id);
- if (node == null) {
- fut.cancel(id);
+ assert node != null;
+ if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut))
return;
- }
if (log.isDebugEnabled())
log.debug("Received supply message: " + supply);
@@ -527,7 +523,7 @@ public class GridDhtPartitionDemander {
try {
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
@@ -609,7 +605,10 @@ public class GridDhtPartitionDemander {
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
- GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+ d.timeout(cctx.config().getRebalanceTimeout());
if (d != null) {
// Create copy.
@@ -618,7 +617,7 @@ public class GridDhtPartitionDemander {
nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- if (!topologyChanged(topVer)) {
+ if (!topologyChanged(fut)) {
// Send demand message.
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
@@ -761,8 +760,12 @@ public class GridDhtPartitionDemander {
/** Lock. */
private final Lock lock = new ReentrantLock();
- /** Assignments. */
- private final GridDhtPreloaderAssignments assigns;
+ /** Exchange future. */
+ @GridToStringExclude
+ private final GridDhtPartitionsExchangeFuture exchFut;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
/**
* @param assigns Assigns.
@@ -774,7 +777,10 @@ public class GridDhtPartitionDemander {
GridCacheContext<?, ?> cctx,
IgniteLogger log,
boolean sentStopEvnt) {
- this.assigns = assigns;
+ assert assigns != null;
+
+ this.exchFut = assigns.exchangeFuture();
+ this.topVer = assigns.topologyVersion();
this.cctx = cctx;
this.log = log;
this.sendStoppedEvnt = sentStopEvnt;
@@ -792,7 +798,8 @@ public class GridDhtPartitionDemander {
* Dummy future. Will be done by real one.
*/
public SyncFuture() {
- this.assigns = null;
+ this.exchFut = null;
+ this.topVer = null;
this.cctx = null;
this.log = null;
this.sendStoppedEvnt = false;
@@ -802,14 +809,14 @@ public class GridDhtPartitionDemander {
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
- return assigns != null ? assigns.topologyVersion() : null;
+ return topVer;
}
/**
* @return Is dummy (created at demander creation).
*/
private boolean isDummy() {
- return assigns == null;
+ return topVer == null;
}
/**
@@ -828,41 +835,22 @@ public class GridDhtPartitionDemander {
}
/**
- * @param node Node.
- * @return Demand message.
- */
- private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
- if (isDone())
- return null;
-
- return assigns.get(node);
- }
-
- /**
- * @return future is done.
+ *
*/
- private boolean doneIfEmpty() {
+ private void doneIfEmpty() {
lock.lock();
try {
if (isDone())
- return true;
-
- if (assigns.isEmpty()) {
- assert remaining.isEmpty();
+ return;
- if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
- if (log.isDebugEnabled())
- log.debug("Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + assigns.topologyVersion() + "]");
+ assert remaining.isEmpty();
- checkIsDone();
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing is not required [cache=" + cctx.name() +
+ ", topology=" + topVer + "]");
- return true;
- }
- else {
- return false;
- }
+ checkIsDone();
}
finally {
lock.unlock();
@@ -953,7 +941,7 @@ public class GridDhtPartitionDemander {
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- assigns.exchangeFuture().discoveryEvent());
+ exchFut.discoveryEvent());
Collection<Integer> parts = remaining.get(nodeId).get2();
@@ -1011,15 +999,15 @@ public class GridDhtPartitionDemander {
}
if (!m.isEmpty()) {
- U.log(log,("Reassigning partitions that were missed: " + m));
+ U.log(log, ("Reassigning partitions that were missed: " + m));
- cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+ cctx.shared().exchange().forceDummyExchange(true, exchFut);
}
cctx.shared().exchange().scheduleResendPartitions();
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
onDone();
}
@@ -1163,7 +1151,7 @@ public class GridDhtPartitionDemander {
// Get the same collection that will be sent in the message.
Collection<Integer> remaining = d.partitions();
- if (topologyChanged(topVer))
+ if (topologyChanged(fut))
return missed;
cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@@ -1195,7 +1183,7 @@ public class GridDhtPartitionDemander {
// While.
// =====
- while (!topologyChanged(topVer)) {
+ while (!topologyChanged(fut)) {
SupplyMessage s = poll(msgQ, timeout);
// If timed out.
@@ -1358,7 +1346,7 @@ public class GridDhtPartitionDemander {
}
}
}
- while (retry && !topologyChanged(topVer));
+ while (retry && !topologyChanged(fut));
return missed;
}
@@ -1375,13 +1363,13 @@ public class GridDhtPartitionDemander {
demandLock.readLock().lock();
try {
- GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+ GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
- AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+ AffinityTopologyVersion topVer = fut.topVer;
Collection<Integer> missed = new HashSet<>();
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut)) {
fut.cancel();
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index ee01158..0641612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -185,7 +185,7 @@ class GridDhtPartitionSupplier {
if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
return;
- GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
d.updateSequence(), cctx.cacheId(), d.topologyVersion());
ClusterNode node = cctx.discovery().node(id);
@@ -289,7 +289,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
@@ -365,7 +365,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
@@ -477,7 +477,7 @@ class GridDhtPartitionSupplier {
if (!reply(node, d, s))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
cctx.cacheId(), d.topologyVersion());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 01056ac..17ebb26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -17,17 +17,30 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Partition supply message.
@@ -36,9 +49,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/** */
private static final long serialVersionUID = 0L;
- /** Worker ID. */
- private int workerId = -1;
-
/** Update sequence. */
private long updateSeq;
@@ -66,17 +76,14 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
private int msgSize;
/**
- * @param workerId Worker ID.
* @param updateSeq Update sequence for this node.
* @param cacheId Cache ID.
*/
- GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
- assert workerId >= 0;
+ GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
assert updateSeq > 0;
this.cacheId = cacheId;
this.updateSeq = updateSeq;
- this.workerId = workerId;
this.topVer = topVer;
}
@@ -98,13 +105,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
}
/**
- * @return Worker ID.
- */
- int workerId() {
- return workerId;
- }
-
- /**
* @return Update sequence.
*/
long updateSequence() {
@@ -255,7 +255,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
for (CacheEntryInfoCollection col : infos().values()) {
- List<GridCacheEntryInfo> entries = col.infos();
+ List<GridCacheEntryInfo> entries = col.infos();
for (int i = 0; i < entries.size(); i++)
entries.get(i).unmarshal(cacheCtx, ldr);
@@ -320,12 +320,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
writer.incrementState();
- case 9:
- if (!writer.writeInt("workerId", workerId))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -390,17 +384,9 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
reader.incrementState();
- case 9:
- workerId = reader.readInt("workerId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
- return true;
+ return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
}
/** {@inheritDoc} */
@@ -410,7 +396,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 9;
}
/** {@inheritDoc} */