You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/10 12:00:57 UTC
[41/41] ignite git commit: ignite-5578 exchane future cleanup
ignite-5578 exchane future cleanup
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a38c46d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a38c46d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a38c46d
Branch: refs/heads/ignite-5578-1
Commit: 5a38c46deb5b61bc3a32d7cc1b2b2e2190035a9f
Parents: f2568b7
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 10 15:00:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 10 15:00:00 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 140 +++++++++++--------
.../processors/cache/GridCachePreloader.java | 8 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../preloader/ForceRebalanceExchangeTask.java | 58 ++++++++
.../dht/preloader/GridDhtPartitionDemander.java | 22 +--
.../preloader/GridDhtPartitionExchangeId.java | 61 +++++++-
.../GridDhtPartitionsExchangeFuture.java | 136 +-----------------
.../dht/preloader/GridDhtPreloader.java | 33 ++---
.../preloader/GridDhtPreloaderAssignments.java | 21 ++-
.../RebalanceReassignExchangeTask.java | 44 ++++++
10 files changed, 291 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f1db79a..1021e03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -377,7 +379,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
"Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
@@ -390,7 +392,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
ExchangeActions exchActions = stateChangeMsg.exchangeActions();
if (exchActions != null) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
@@ -401,7 +403,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
ExchangeActions exchActions = batch.exchangeActions();
if (exchActions != null) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
@@ -411,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
@@ -422,7 +424,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
else if (customMsg instanceof SnapshotDiscoveryMessage
&& ((SnapshotDiscoveryMessage) customMsg).needExchange()) {
- exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
+ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
exchFut = exchangeFuture(exchId, evt, null, null, null);
}
@@ -486,7 +488,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
- return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED);
+ return exchangeId(cctx.localNode().id(), startTopVer, discoEvt);
}
/**
@@ -863,27 +865,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param exchFut Exchange future.
- * @param reassign Dummy reassign flag.
+ * @param exchId Exchange ID.
*/
- public void forceDummyExchange(boolean reassign,
- GridDhtPartitionsExchangeFuture exchFut) {
- exchWorker.addExchangeFuture(
- new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
+ public void forceReassign(GridDhtPartitionExchangeId exchId) {
+ exchWorker.forceReassign(exchId);
}
/**
- * Forces preload exchange.
- *
- * @param exchFut Exchange future.
+ * @param exchId Exchange ID.
+ * @return Rebalance future.
*/
- public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
- GridCompoundFuture<Boolean, Boolean> fut = new GridCompoundFuture<>(CU.boolReducer());
-
- exchWorker.addExchangeFuture(
- new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
-
- return fut;
+ public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionExchangeId exchId) {
+ return exchWorker.forceRebalance(exchId);
}
/**
@@ -1203,10 +1196,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param nodeId Cause node ID.
* @param topVer Topology version.
- * @param evt Event type.
- * @return Activity future ID.
+ * @param evt Event.
+ * @return Exchange ID instance.
*/
- private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, int evt) {
+ private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, DiscoveryEvent evt) {
return new GridDhtPartitionExchangeId(nodeId, evt, topVer);
}
@@ -1677,7 +1670,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return {@code True} if this is exchange task.
*/
private static boolean isExchangeTask(CachePartitionExchangeWorkerTask task) {
- return task instanceof GridDhtPartitionsExchangeFuture;
+ return task instanceof GridDhtPartitionsExchangeFuture ||
+ task instanceof RebalanceReassignExchangeTask ||
+ task instanceof ForceRebalanceExchangeTask;
}
/**
@@ -1787,13 +1782,32 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param exchId Exchange ID.
+ */
+ void forceReassign(GridDhtPartitionExchangeId exchId) {
+ if (!hasPendingExchange() && !busy)
+ futQ.add(new RebalanceReassignExchangeTask(exchId));
+ }
+
+ /**
+ * @param exchId Exchange ID.
+ * @return Rebalance future.
+ */
+ IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionExchangeId exchId) {
+ GridCompoundFuture<Boolean, Boolean> fut = new GridCompoundFuture<>(CU.boolReducer());
+
+ futQ.add(new ForceRebalanceExchangeTask(exchId, fut));
+
+ return fut;
+ }
+
+ /**
* @param exchFut Exchange future.
*/
void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut != null;
- if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
- futQ.offer(exchFut);
+ futQ.offer(exchFut);
if (log.isDebugEnabled())
log.debug("Added exchange future to exchange worker: " + exchFut);
@@ -1922,22 +1936,37 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
continue;
}
- assert task instanceof GridDhtPartitionsExchangeFuture;
-
- GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
-
busy = true;
Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
- boolean dummyReassign = exchFut.dummyReassign();
- boolean forcePreload = exchFut.forcePreload();
+ boolean forcePreload = false;
+
+ GridDhtPartitionExchangeId exchId;
+
+ GridDhtPartitionsExchangeFuture exchFut = null;
try {
if (isCancelled())
break;
- if (!exchFut.dummy() && !exchFut.forcePreload()) {
+ if (task instanceof RebalanceReassignExchangeTask) {
+ exchId = ((RebalanceReassignExchangeTask) task).exchangeId();
+ }
+ else if (task instanceof ForceRebalanceExchangeTask) {
+ forcePreload = true;
+
+ timeout = 0; // Force refresh.
+
+ exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
+ }
+ else {
+ assert task instanceof GridDhtPartitionsExchangeFuture : task;
+
+ exchFut = (GridDhtPartitionsExchangeFuture)task;
+
+ exchId = exchFut.exchangeId();
+
lastInitializedFut = exchFut;
exchFut.init();
@@ -2003,18 +2032,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
refreshPartitions();
}
- else {
- if (log.isDebugEnabled())
- log.debug("Got dummy exchange (will reassign)");
- if (!dummyReassign) {
- timeout = 0; // Force refresh.
-
- continue;
- }
- }
-
- if (!exchFut.skipPreload() ) {
+ if (!cctx.kernalContext().clientNode()) {
assignsMap = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
@@ -2024,7 +2043,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Don't delay for dummy reassigns to avoid infinite recursion.
if (delay == 0 || forcePreload)
- assigns = grp.preloader().assign(exchFut);
+ assigns = grp.preloader().assign(exchId, exchFut);
assignsMap.put(grp.groupId(), assigns);
}
@@ -2059,6 +2078,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean assignsCancelled = false;
+ GridCompoundFuture<Boolean, Boolean> forcedRebFut = null;
+
+ if (task instanceof ForceRebalanceExchangeTask)
+ forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
+
for (Integer order : orderMap.descendingKeySet()) {
for (Integer grpId : orderMap.get(order)) {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
@@ -2077,7 +2101,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
forcePreload,
cnt,
r,
- exchFut.forcedRebalanceFuture());
+ forcedRebFut);
if (cur != null) {
rebList.add(grp.cacheOrGroupName());
@@ -2087,13 +2111,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- if (exchFut.forcedRebalanceFuture() != null)
- exchFut.forcedRebalanceFuture().markInitialized();
+ if (forcedRebFut != null)
+ forcedRebFut.markInitialized();
if (assignsCancelled) { // Pending exchange.
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ ", node=" + exchId.nodeId() + ']');
}
else if (r != null) {
Collections.reverse(rebList);
@@ -2102,20 +2126,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!hasPendingExchange()) {
U.log(log, "Rebalancing started " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ ", node=" + exchId.nodeId() + ']');
r.run(); // Starts rebalancing routine.
}
else
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ ", node=" + exchId.nodeId() + ']');
}
else
U.log(log, "Skipping rebalancing (nothing scheduled) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() +
+ ", node=" + exchId.nodeId() + ']');
}
}
catch (IgniteInterruptedCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 4e74532..dc1624a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
@@ -63,10 +64,12 @@ public interface GridCachePreloader {
public void onInitialExchangeComplete(@Nullable Throwable err);
/**
- * @param exchFut Exchange future to assign.
+ * @param exchId Exchange ID.
+ * @param exchFut Exchange future.
* @return Assignments or {@code null} if detected that there are pending exchanges.
*/
- @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
+ @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId,
+ @Nullable GridDhtPartitionsExchangeFuture exchFut);
/**
* Adds assignments to preloader.
@@ -75,6 +78,7 @@ public interface GridCachePreloader {
* @param forcePreload Force preload flag.
* @param cnt Counter.
* @param next Runnable responsible for cache rebalancing start.
+ * @param forcedRebFut Rebalance future.
* @return Rebalancing runnable.
*/
public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index d2ca229..c0accf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
@@ -152,7 +153,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsExchangeFuture exchFut) {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
new file mode 100644
index 0000000..c820175
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+
+/**
+ *
+ */
+public class ForceRebalanceExchangeTask implements CachePartitionExchangeWorkerTask {
+ /** */
+ private final GridDhtPartitionExchangeId exchId;
+
+ /** */
+ private final GridCompoundFuture<Boolean, Boolean> forcedRebFut;
+
+ /**
+ * @param exchId Exchange ID.
+ * @param forcedRebFut Rebalance future.
+ */
+ public ForceRebalanceExchangeTask(GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+ assert exchId != null;
+ assert forcedRebFut != null;
+
+ this.exchId = exchId;
+ this.forcedRebFut = forcedRebFut;
+ }
+
+ /**
+ * @return Exchange ID.
+ */
+ public GridDhtPartitionExchangeId exchangeId() {
+ return exchId;
+ }
+
+ /**
+ * @return Rebalance future.
+ */
+ public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() {
+ return forcedRebFut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 4f34aba..248b739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -189,7 +189,7 @@ public class GridDhtPartitionDemander {
}
/**
- * Force Rebalance.
+ * @return Rebalance future.
*/
IgniteInternalFuture<Boolean> forceRebalance() {
GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
@@ -207,7 +207,7 @@ public class GridDhtPartitionDemander {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
+ IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut.exchangeId());
fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -363,7 +363,7 @@ public class GridDhtPartitionDemander {
@Override public void onTimeout() {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- ctx.exchange().forceRebalance(exchFut);
+ ctx.exchange().forceRebalance(exchFut.exchangeId());
}
});
}
@@ -861,9 +861,9 @@ public class GridDhtPartitionDemander {
/** Missed. */
private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
- /** Exchange future. */
+ /** Exchange ID. */
@GridToStringExclude
- private final GridDhtPartitionsExchangeFuture exchFut;
+ private final GridDhtPartitionExchangeId exchId;
/** Topology version. */
private final AffinityTopologyVersion topVer;
@@ -884,7 +884,7 @@ public class GridDhtPartitionDemander {
long updateSeq) {
assert assigns != null;
- exchFut = assigns.exchangeFuture();
+ exchId = assigns.exchangeId();
topVer = assigns.topologyVersion();
this.grp = grp;
@@ -898,7 +898,7 @@ public class GridDhtPartitionDemander {
* Dummy future. Will be done by real one.
*/
RebalanceFuture() {
- this.exchFut = null;
+ this.exchId = null;
this.topVer = null;
this.ctx = null;
this.grp = null;
@@ -1032,7 +1032,7 @@ public class GridDhtPartitionDemander {
return;
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent());
+ rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent());
T2<Long, Collection<Integer>> t = remaining.get(nodeId);
@@ -1108,7 +1108,7 @@ public class GridDhtPartitionDemander {
onDone(false); //Finished but has missed partitions, will force dummy exchange
- ctx.exchange().forceDummyExchange(true, exchFut);
+ ctx.exchange().forceReassign(exchId);
return;
}
@@ -1125,7 +1125,7 @@ public class GridDhtPartitionDemander {
*/
private void sendRebalanceStartedEvent() {
if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED))
- rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+ rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchId.discoveryEvent());
}
/**
@@ -1133,7 +1133,7 @@ public class GridDhtPartitionDemander {
*/
private void sendRebalanceFinishedEvent() {
if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED))
- rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+ rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchId.discoveryEvent());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 012634e..0a49415 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -23,6 +23,9 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -55,20 +58,27 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
/** Topology version. */
private AffinityTopologyVersion topVer;
+ /** */
+ @GridDirectTransient
+ private DiscoveryEvent discoEvt;
+
/**
* @param nodeId Node ID.
- * @param evt Event.
+ * @param discoEvt Event.
* @param topVer Topology version.
*/
- public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) {
+ public GridDhtPartitionExchangeId(UUID nodeId, DiscoveryEvent discoEvt, AffinityTopologyVersion topVer) {
assert nodeId != null;
- assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED ||
- evt == EVT_DISCOVERY_CUSTOM_EVT;
- assert topVer.topologyVersion() > 0;
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+ assert discoEvt != null;
this.nodeId = nodeId;
- this.evt = evt;
+ this.evt = discoEvt.type();
this.topVer = topVer;
+ this.discoEvt = discoEvt;
+
+ assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED ||
+ evt == EVT_DISCOVERY_CUSTOM_EVT;
}
/**
@@ -93,6 +103,45 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
}
/**
+ * @return Discovery event timestamp.
+ */
+ long eventTimestamp() {
+ assert discoEvt != null;
+
+ return discoEvt.timestamp();
+ }
+
+ /**
+ * @param discoEvt Discovery event.
+ */
+ void discoveryEvent(DiscoveryEvent discoEvt) {
+ this.discoEvt = discoEvt;
+ }
+
+ /**
+ * @return Discovery event.
+ */
+ DiscoveryEvent discoveryEvent() {
+ assert discoEvt != null;
+
+ return discoEvt;
+ }
+
+ /**
+ * @return Discovery event node.
+ */
+ public ClusterNode eventNode() {
+ return discoEvt.eventNode();
+ }
+
+ /**
+ * @return Discovery event name.
+ */
+ public String discoveryEventName() {
+ return U.gridEventName(evt);
+ }
+
+ /**
* @return Order.
*/
public AffinityTopologyVersion topologyVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 21a3a13..6d98601 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -75,7 +75,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -110,14 +109,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
implements Comparable<GridDhtPartitionsExchangeFuture>, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
/** */
public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time";
- /** Dummy flag. */
- private final boolean dummy;
-
- /** Force preload flag. */
- private final boolean forcePreload;
-
- /** Dummy reassign flag. */
- private final boolean reassign;
/** */
@GridToStringExclude
@@ -196,9 +187,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
private CacheAffinityChangeMessage affChangeMsg;
- /** Skip preload flag. */
- private boolean skipPreload;
-
/** */
private boolean clientOnlyExchange;
@@ -221,9 +209,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
- /** Forced Rebalance future. */
- private GridCompoundFuture<Boolean, Boolean> forcedRebFut;
-
/** */
private volatile Map<Integer, Map<Integer, Long>> partHistReserved;
@@ -235,62 +220,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private final AtomicBoolean done = new AtomicBoolean();
/**
- * Dummy future created to trigger reassignments if partition
- * topology changed while preloading.
- *
- * @param cctx Cache context.
- * @param reassign Dummy reassign flag.
- * @param discoEvt Discovery event.
- * @param exchId Exchange id.
- */
- public GridDhtPartitionsExchangeFuture(
- GridCacheSharedContext cctx,
- boolean reassign,
- DiscoveryEvent discoEvt,
- GridDhtPartitionExchangeId exchId
- ) {
- dummy = true;
- forcePreload = false;
-
- this.exchId = exchId;
- this.reassign = reassign;
- this.discoEvt = discoEvt;
- this.cctx = cctx;
-
- log = cctx.logger(getClass());
- exchLog = cctx.logger(EXCHANGE_LOG);
-
- onDone(exchId.topologyVersion());
- }
-
- /**
- * Force preload future created to trigger reassignments if partition
- * topology changed while preloading.
- *
- * @param cctx Cache context.
- * @param discoEvt Discovery event.
- * @param exchId Exchange id.
- * @param forcedRebFut Forced Rebalance future.
- */
- public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt,
- GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
- dummy = false;
- forcePreload = true;
-
- this.exchId = exchId;
- this.discoEvt = discoEvt;
- this.cctx = cctx;
- this.forcedRebFut = forcedRebFut;
-
- log = cctx.logger(getClass());
- exchLog = cctx.logger(EXCHANGE_LOG);
-
- reassign = true;
-
- onDone(exchId.topologyVersion());
- }
-
- /**
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
@@ -309,10 +238,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert exchId.topologyVersion() != null;
assert exchActions == null || !exchActions.empty();
- dummy = false;
- forcePreload = false;
- reassign = false;
-
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
@@ -351,41 +276,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return Skip preload flag.
- */
- public boolean skipPreload() {
- return skipPreload;
- }
-
- /**
- * @return Dummy flag.
- */
- public boolean dummy() {
- return dummy;
- }
-
- /**
- * @return Force preload flag.
- */
- public boolean forcePreload() {
- return forcePreload;
- }
-
- /**
- * @return Dummy reassign flag.
- */
- public boolean reassign() {
- return reassign;
- }
-
- /**
- * @return {@code True} if dummy reassign.
- */
- public boolean dummyReassign() {
- return (dummy() || forcePreload()) && reassign();
- }
-
- /**
* @param grpId Cache group ID.
* @param partId Partition ID.
* @return ID of history supplier node or null if it doesn't exist.
@@ -453,6 +343,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
assert exchId.equals(this.exchId);
+ this.exchId.discoveryEvent(discoEvt);
this.discoEvt = discoEvt;
this.discoCache = discoCache;
@@ -476,7 +367,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @return {@code True} if deactivate cluster exchange.
*/
- boolean deactivateCluster() {
+ private boolean deactivateCluster() {
return exchActions != null && exchActions.deactivate();
}
@@ -495,13 +386,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @return Forced Rebalance future.
- */
- @Nullable public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() {
- return forcedRebFut;
- }
-
- /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -538,7 +422,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert discoEvt != null : this;
assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
- assert !dummy && !forcePreload : this;
try {
discoCache.updateAlives(cctx.discovery());
@@ -553,8 +436,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean crdNode = crd != null && crd.isLocal();
- skipPreload = cctx.kernalContext().clientNode();
-
exchLog.info("Started exchange init [topVer=" + topVer +
", crd=" + crdNode +
", evt=" + discoEvt.type() +
@@ -1325,13 +1206,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
- boolean realExchange = !dummy && !forcePreload;
-
if (!done.compareAndSet(false, true))
- return dummy;
+ return false;
if (err == null &&
- realExchange &&
!cctx.kernalContext().clientNode() &&
(serverNodeDiscoveryEvent() || affChangeMsg != null)) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1342,7 +1220,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- if (err == null && realExchange) {
+ if (err == null) {
if (centralizedAff) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
@@ -1412,14 +1290,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.database().releaseHistoryForExchange();
- if (err == null && realExchange) {
+ if (err == null) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
}
}
- if (super.onDone(res, err) && realExchange) {
+ if (super.onDone(res, err)) {
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
@@ -1441,7 +1319,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return true;
}
- return dummy;
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 2b18c24..7efd4aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -179,22 +179,21 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = grp.topology();
if (!grp.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+ return new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
int partCnt = grp.affinity().partitions();
- assert exchFut.forcePreload() || exchFut.dummyReassign() ||
- exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchId +
", grp=" + grp.name() +
", topVer=" + top.topologyVersion() + ']';
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
@@ -204,7 +203,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (ctx.exchange().hasPendingExchange()) {
if (log.isDebugEnabled())
log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
- exchFut.exchangeId());
+ exchId);
assigns.cancelled(true);
@@ -220,7 +219,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
ClusterNode histSupplier = null;
- if (ctx.database().persistenceEnabled()) {
+ if (ctx.database().persistenceEnabled() && exchFut != null) {
UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p);
if (nodeId != null)
@@ -243,7 +242,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (msg == null) {
assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
+ exchId.topologyVersion(),
grp.groupId()));
}
@@ -292,14 +291,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
top.own(part);
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- grp.addRebalanceEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST,
- discoEvt.eventNode(),
- discoEvt.type(),
- discoEvt.timestamp());
- }
+ grp.addRebalanceEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST,
+ exchId.eventNode(),
+ exchId.event(),
+ exchId.eventTimestamp());
+ }
if (log.isDebugEnabled())
log.debug("Owning partition as there are no other owners: " + part);
@@ -312,7 +309,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (msg == null) {
assigns.put(n, msg = new GridDhtPartitionDemandMessage(
top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
+ exchId.topologyVersion(),
grp.groupId()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 3f82c9b..41dd076 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -30,25 +30,24 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
/** */
private static final long serialVersionUID = 0L;
- /** Exchange future. */
- @GridToStringExclude
- private final GridDhtPartitionsExchangeFuture exchFut;
+ /** */
+ private final GridDhtPartitionExchangeId exchangeId;
- /** Last join order. */
+ /** */
private final AffinityTopologyVersion topVer;
/** */
private boolean cancelled;
/**
- * @param exchFut Exchange future.
+ * @param exchangeId Exchange ID.
* @param topVer Last join order.
*/
- public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) {
- assert exchFut != null;
+ public GridDhtPreloaderAssignments(GridDhtPartitionExchangeId exchangeId, AffinityTopologyVersion topVer) {
+ assert exchangeId != null;
assert topVer.topologyVersion() > 0 : topVer;
- this.exchFut = exchFut;
+ this.exchangeId = exchangeId;
this.topVer = topVer;
}
@@ -69,8 +68,8 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
/**
* @return Exchange future.
*/
- GridDhtPartitionsExchangeFuture exchangeFuture() {
- return exchFut;
+ GridDhtPartitionExchangeId exchangeId() {
+ return exchangeId;
}
/**
@@ -82,7 +81,7 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", exchFut.exchangeId(),
+ return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", exchangeId,
"super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a38c46d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
new file mode 100644
index 0000000..9a76a8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+
+/**
+ *
+ */
+public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask {
+ /** */
+ private final GridDhtPartitionExchangeId exchId;
+
+ /**
+ * @param exchId Exchange ID.
+ */
+ public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
+ assert exchId != null;
+
+ this.exchId = exchId;
+ }
+
+ /**
+ * @return Exchange ID.
+ */
+ public GridDhtPartitionExchangeId exchangeId() {
+ return exchId;
+ }
+}