You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/27 10:00:39 UTC
[1/5] ignite git commit: Ignite-1093
Repository: ignite
Updated Branches:
refs/heads/ignite-1093-3 [created] a34a408bf
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 74237f8..edcc18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -17,15 +17,19 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -47,27 +51,38 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
/**
* DHT cache preloader.
*/
public class GridDhtPreloader extends GridCachePreloaderAdapter {
+ /** */
+ public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
+
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
@@ -81,10 +96,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool supplyPool;
+ private GridDhtPartitionSupplier supplier;
/** Partition demanders. */
- private GridDhtPartitionDemandPool demandPool;
+ private GridDhtPartitionDemander demander;
/** Start future. */
private GridFutureAdapter<Object> startFut;
@@ -92,10 +107,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
+ /** Demand lock. */
+ private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
/** Pending affinity assignment futures. */
private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
new ConcurrentHashMap8<>();
+ /** */
+ private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+
+ /** */
+ private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -179,8 +203,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+ supplier = new GridDhtPartitionSupplier(cctx);
+ demander = new GridDhtPartitionDemander(cctx, busyLock);
+
+ supplier.start();
+ demander.start();
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -199,19 +226,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
final long startTopVer = loc.order();
topVer.setIfGreater(startTopVer);
-
- supplyPool.start();
- demandPool.start();
}
/** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
super.preloadPredicate(preloadPred);
- assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+ assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
- supplyPool.preloadPredicate(preloadPred);
- demandPool.preloadPredicate(preloadPred);
+ supplier.preloadPredicate(preloadPred);
+ demander.preloadPredicate(preloadPred);
}
/** {@inheritDoc} */
@@ -225,37 +249,104 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
// Acquire write busy lock.
busyLock.writeLock().lock();
- if (supplyPool != null)
- supplyPool.stop();
+ if (supplier != null)
+ supplier.stop();
- if (demandPool != null)
- demandPool.stop();
+ if (demander != null)
+ demander.stop();
top = null;
}
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
- if (err == null) {
+ if (err == null)
startFut.onDone();
+ else
+ startFut.onDone(err);
+ }
- final long start = U.currentTimeMillis();
+ /** {@inheritDoc} */
+ @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ demander.updateLastExchangeFuture(lastFut);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ // No assignments for disabled preloader.
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ if (!cctx.rebalanceEnabled())
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ int partCnt = cctx.affinity().partitions();
+
+ assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+ exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ ", topVer=" + top.topologyVersion() + ']';
+
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+ for (int p = 0; p < partCnt; p++) {
+ if (cctx.shared().exchange().hasPendingExchange()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+ exchFut.exchangeId());
+
+ break;
+ }
- final CacheConfiguration cfg = cctx.config();
+ // If partition belongs to local node.
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+ assert part != null;
+ assert part.id() == p;
- demandPool.syncFuture().listen(new CI1<Object>() {
- @Override public void apply(Object t) {
- U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
- "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ if (part.state() != MOVING) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+ continue; // For.
+ }
+
+ Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+ if (picked.isEmpty()) {
+ top.own(part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+ cctx.events().addPreloadEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+ discoEvt.type(), discoEvt.timestamp());
}
- });
+
+ if (log.isDebugEnabled())
+ log.debug("Owning partition as there are no other owners: " + part);
+ }
+ else {
+ ClusterNode n = F.rand(picked);
+
+ GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchFut.exchangeId().topologyVersion(),
+ cctx.cacheId()));
+ }
+
+ msg.addPartition(p);
+ }
}
}
- else
- startFut.onDone(err);
+
+ return assigns;
}
/** {@inheritDoc} */
@@ -269,24 +360,77 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.set(topVer0);
}
- /** {@inheritDoc} */
- @Override public void onExchangeFutureAdded() {
- demandPool.onExchangeFutureAdded();
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Picked owners.
+ */
+ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+ int affCnt = affNodes.size();
+
+ Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+ int rmtCnt = rmts.size();
+
+ if (rmtCnt <= affCnt)
+ return rmts;
+
+ List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+ // Sort in descending order, so nodes with higher order will be first.
+ Collections.sort(sorted, CU.nodeComparator(false));
+
+ // Pick newest nodes.
+ return sorted.subList(0, affCnt);
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
}
/** {@inheritDoc} */
- @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- demandPool.updateLastExchangeFuture(lastFut);
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+ if (!enterBusy())
+ return;
+
+ try {
+ demandLock.readLock().lock();
+ try {
+ demander.handleSupplyMessage(idx, id, s);
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
+ }
+ finally {
+ leaveBusy();
+ }
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- return demandPool.assign(exchFut);
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ if (!enterBusy())
+ return;
+
+ try {
+ supplier.handleDemandMessage(idx, id, d);
+ }
+ finally {
+ leaveBusy();
+ }
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
- demandPool.addAssignments(assignments, forcePreload);
+ @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+ boolean forcePreload, Collection<String> caches, int cnt) {
+ return demander.addAssignments(assignments, forcePreload, caches, cnt);
}
/**
@@ -298,7 +442,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+ return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
}
/**
@@ -581,12 +730,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void forcePreload() {
- demandPool.forcePreload();
+ demander.forcePreload();
}
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- demandPool.unwindUndeploys();
+ demandLock.writeLock().lock();
+
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
}
/**
@@ -607,6 +763,41 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
forceKeyFuts.remove(fut.futureId(), fut);
}
+ /** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+ partitionsToEvict.add(part);
+
+ if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+ cctx.closures().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ boolean firstRun = true;
+
+ while (true) {
+ if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+ return false;
+
+ firstRun = false;
+
+ try {
+ GridDhtLocalPartition part = partitionsToEvict.poll();
+
+ if (part == null) {
+ return false;
+ }
+
+ part.tryEvict();
+ }
+ finally {
+ boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+
+ assert res;
+ }
+ }
+ }
+ }, /*system pool*/ true);
+ }
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 7c5e97c..810bd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1292,6 +1292,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
catch (IgniteCheckedException e) {
U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
}
+ finally {
+ ctx.cache().context().txContextReset();
+ }
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 26a41de..9315d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,7 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
- return;
+ selfOccupied = true;
+
+ continue;
}
// Only process 1st response and ignore following ones. This scenario
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
index 835cdcb..c95a859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
@@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza
GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o;
- return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
+ return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1824339..183838b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -525,6 +525,8 @@ public class GridNioServer<T> {
assert ses instanceof GridSelectorNioSessionImpl;
assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
+ U.log(log, "Pausing reads");
+
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
if (impl.closed())
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6254605..854ce95 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* <p>
* This method is intended for test purposes only.
*/
- void simulateNodeFailure() {
+ protected void simulateNodeFailure() {
impl.simulateNodeFailure();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 3913957..d868468 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -105,6 +105,9 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
GridTestUtils.setMemoryMode(cfg, ccfg, memMode, 100, 1024);
+ //To be uncommented after https://issues.apache.org/jira/browse/IGNITE-1578 fix.
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..c65a0ed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+ }
+
+ return iCfg;
+ }
+
+ /**
+ * @throws Exception Exception.
+ */
+ public void testNodeFailedAtRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ startGrid(1);
+
+ GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)grid(1).context().
+ cache().internalCache(CACHE_NAME_DHT_REPLICATED).preloader().rebalanceFuture();
+
+ fut.get();
+
+ ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure();
+
+ checkSupplyContextMapIsEmpty();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..cea7808
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int TEST_SIZE = 100_000;
+
+ /** partitioned cache name. */
+ protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+
+ /** partitioned cache 2 name. */
+ protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+
+ /** replicated cache name. */
+ protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+
+ /** replicated cache 2 name. */
+ protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+
+ /** */
+ private volatile boolean concurrentStartFinished;
+
+ /** */
+ private volatile boolean concurrentStartFinished2;
+
+ /** */
+ private volatile boolean concurrentStartFinished3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(10).equals(gridName))
+ iCfg.setClientMode(true);
+
+ CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>();
+
+ cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED);
+ cachePCfg.setCacheMode(CacheMode.PARTITIONED);
+ cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg.setBackups(1);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesCount(1);
+ cachePCfg.setRebalanceOrder(2);
+
+ CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
+
+ cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2);
+ cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
+ cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg2.setBackups(1);
+ cachePCfg2.setRebalanceOrder(2);
+ //cachePCfg2.setRebalanceDelay(5000);//Known issue, possible deadlock in case of low priority cache rebalancing delayed.
+
+ CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+ cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
+ cacheRCfg.setCacheMode(CacheMode.REPLICATED);
+ cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheRCfg.setRebalanceBatchSize(1);
+ cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
+ ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE.
+
+ CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
+
+ cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
+ cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
+ cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheRCfg2.setRebalanceOrder(4);
+
+ iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2);
+
+ iCfg.setRebalanceThreadPoolSize(2);
+
+ return iCfg;
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite, int from, int iter) {
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite, String name, int from, int iter) {
+ for (int i = from; i < from + TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ ignite.cache(name).put(i, i + name.hashCode() + iter);
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param name Cache name.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException {
+ for (int i = from; i < from + TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
+ i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+ }
+ }
+
+ /**
+ * @throws Exception Exception
+ */
+ public void testSimpleRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+
+ waitForRebalancing(0, 2);
+ waitForRebalancing(1, 2);
+
+ stopGrid(0);
+
+ waitForRebalancing(1, 3);
+
+ startGrid(2);
+
+ waitForRebalancing(1, 4);
+ waitForRebalancing(2, 4);
+
+ stopGrid(2);
+
+ waitForRebalancing(1, 5);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(1), 0, 0);
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+ }
+
+ /**
+ * @throws Exception Exception
+ */
+ public void testLoadRebalancing() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ startGrid(1);
+
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ concurrentStartFinished = false;
+
+ Thread t1 = new Thread() {
+ @Override public void run() {
+ while (!concurrentStartFinished) {
+ for (int i = 0; i < 0 + TEST_SIZE; i++) {
+ if (i % (TEST_SIZE / 10) == 0)
+ log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+ ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(i, i + CACHE_NAME_DHT_PARTITIONED.hashCode() + 0);
+ }
+ }
+ }
+ };
+ Thread t2 = new Thread() {
+ @Override public void run() {
+ while (!concurrentStartFinished) {
+ try {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ t1.start();
+ t2.start();
+
+ startGrid(2);
+
+ waitForRebalancing(2, 3);
+
+ concurrentStartFinished = true;
+
+ t1.join();
+ t2.join();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+ }
+
+ /**
+ * @param id Node id.
+ * @param major Major ver.
+ * @param minor Minor ver.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+ }
+
+ /**
+ * @param id Node id.
+ * @param major Major ver.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major));
+ }
+
+ /**
+ * @param id Node id.
+ * @param top Topology version.
+ * @throws IgniteCheckedException
+ */
+ protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
+ boolean finished = false;
+
+ while (!finished) {
+ finished = true;
+
+ for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+ GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
+ if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
+ finished = false;
+
+ break;
+ }
+ else if (!fut.get()) {
+ finished = false;
+
+ log.warning("Rebalancing finished with missed partitions.");
+ }
+ }
+ }
+ }
+
+ protected void checkSupplyContextMapIsEmpty() {
+ for (Ignite g : G.allGrids()) {
+ for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
+
+ Object supplier = U.field(c.preloader(), "supplier");
+
+ Map map = U.field(supplier, "scMap");
+
+ assert map.isEmpty();
+ }
+ }
+ }
+
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testComplexRebalancing() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ generateData(ignite, 0, 0);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ concurrentStartFinished = false;
+ concurrentStartFinished2 = false;
+ concurrentStartFinished3 = false;
+
+ Thread t1 = new Thread() {
+ @Override public void run() {
+ try {
+ startGrid(1);
+ startGrid(2);
+
+ while (!concurrentStartFinished2) {
+ U.sleep(10);
+ }
+
+ waitForRebalancing(0, 5, 0);
+ waitForRebalancing(1, 5, 0);
+ waitForRebalancing(2, 5, 0);
+ waitForRebalancing(3, 5, 0);
+ waitForRebalancing(4, 5, 0);
+
+ checkSupplyContextMapIsEmpty();
+
+ //New cache should start rebalancing.
+ CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+ cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+ cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ grid(0).getOrCreateCache(cacheRCfg);
+
+ while (!concurrentStartFinished3) {
+ U.sleep(10);
+ }
+
+ concurrentStartFinished = true;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t2 = new Thread() {
+ @Override public void run() {
+ try {
+ startGrid(3);
+ startGrid(4);
+
+ concurrentStartFinished2 = true;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread t3 = new Thread() {
+ @Override public void run() {
+ generateData(ignite, 0, 1);
+
+ concurrentStartFinished3 = true;
+ }
+ };
+
+ t1.start();
+ t2.start();// Should cancel t1 rebalancing.
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+
+ waitForRebalancing(0, 5, 1);
+ waitForRebalancing(1, 5, 1);
+ waitForRebalancing(2, 5, 1);
+ waitForRebalancing(3, 5, 1);
+ waitForRebalancing(4, 5, 1);
+
+ checkSupplyContextMapIsEmpty();
+
+ checkData(grid(4), 0, 1);
+
+ final Ignite ignite3 = grid(3);
+
+ Thread t4 = new Thread() {
+ @Override public void run() {
+ generateData(ignite3, 0, 2);
+
+ }
+ };
+
+ t4.start();
+
+ stopGrid(0);
+
+ waitForRebalancing(1, 6);
+ waitForRebalancing(2, 6);
+ waitForRebalancing(3, 6);
+ waitForRebalancing(4, 6);
+
+ stopGrid(1);
+
+ waitForRebalancing(2, 7);
+ waitForRebalancing(3, 7);
+ waitForRebalancing(4, 7);
+
+ stopGrid(2);
+
+ waitForRebalancing(3, 8);
+ waitForRebalancing(4, 8);
+
+ t4.join();
+
+ stopGrid(3);
+
+ waitForRebalancing(4, 9);
+
+ checkSupplyContextMapIsEmpty();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(4), 0, 2);
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+ }
+
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index c4ad169..64f1495 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -142,26 +142,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception If failed.
- */
- public void testSingleZeroPoolSize() throws Exception {
- preloadMode = SYNC;
- poolSize = 0;
-
- try {
- startGrid(1);
-
- assert false : "Grid should have been failed to start.";
- }
- catch (IgniteCheckedException e) {
- info("Caught expected exception: " + e);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
* @throws Exception If test failed.
*/
public void testIntegrity() throws Exception {
@@ -602,4 +582,4 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
// No-op.
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
- *
- */
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
- /** */
- private boolean ignorePingResponse;
-
- /** {@inheritDoc} */
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
- IgniteCheckedException {
- if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
- return;
- else
- super.writeToSocket(sock, msg, timeout);
- }
- }
-
- /**
* @throws Exception If any error occurs.
*/
public void testNodeAdded() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ public boolean ignorePingResponse;
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+ IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+ return;
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void simulateNodeFailure() {
+ super.simulateNodeFailure();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..41d4b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1228,7 +1229,7 @@ public abstract class GridAbstractTest extends TestCase {
cfg.setCommunicationSpi(commSpi);
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
if (isDebug()) {
discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index e4c2129..a228a9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -482,7 +482,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
if (i == 0)
start = System.currentTimeMillis();
- if (System.currentTimeMillis() - start > 30_000)
+ if (System.currentTimeMillis() - start > 30_000) {
+ U.dumpThreads(log);
+
throw new IgniteException("Timeout of waiting for topology map update [" +
"grid=" + g.name() +
", cache=" + cfg.getName() +
@@ -491,6 +493,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
", p=" + p +
", readVer=" + readyVer +
", locNode=" + g.cluster().localNode() + ']');
+ }
Thread.sleep(200); // Busy wait.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 02a7f7f..240cc68 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -85,6 +87,9 @@ public class IgniteCacheTestSuite3 extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("IgniteCache Test Suite part 3");
+ suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
+
// Value consistency tests.
suite.addTestSuite(GridCacheValueConsistencyAtomicSelfTest.class);
suite.addTestSuite(GridCacheValueConsistencyAtomicPrimaryWriteOrderSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
index 0226046..582bfe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
@@ -107,25 +107,33 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testTask() throws Exception {
+ Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+ int initSize = map.size();
+
ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), Task.class, null);
- testMessageSet(fut);
+ testMessageSet(fut, initSize, map);
}
/**
* @throws Exception If failed.
*/
public void testTaskException() throws Exception {
+ Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+ int initSize = map.size();
+
ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), FailTask.class, null);
- testMessageSet(fut);
+ testMessageSet(fut, initSize, map);
}
/**
* @param fut Future to cancel.
* @throws Exception If failed.
*/
- private void testMessageSet(IgniteFuture<?> fut) throws Exception {
+ private void testMessageSet(IgniteFuture<?> fut, int initSize, Map map) throws Exception {
cancelLatch.await();
assertTrue(fut.cancel());
@@ -134,11 +142,9 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
assertTrue(U.await(finishLatch, 5000, MILLISECONDS));
- Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
-
info("Map: " + map);
- assertTrue(map.isEmpty());
+ assertEquals(map.size(), initSize);
}
/**
[5/5] ignite git commit: Ignite-1093
Posted by sb...@apache.org.
Ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34a408b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34a408b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34a408b
Branch: refs/heads/ignite-1093-3
Commit: a34a408bf4736aca2879207037ba3bfe5d87de82
Parents: 1334e77
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Oct 27 12:00:13 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Oct 27 12:00:13 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 50 +-
.../configuration/IgniteConfiguration.java | 30 +-
.../apache/ignite/internal/IgniteKernal.java | 10 +
.../org/apache/ignite/internal/IgnitionEx.java | 3 +
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheIoManager.java | 9 +
.../processors/cache/GridCacheMapEntry.java | 38 +-
.../GridCachePartitionExchangeManager.java | 157 ++-
.../processors/cache/GridCachePreloader.java | 43 +-
.../cache/GridCachePreloaderAdapter.java | 35 +-
.../processors/cache/GridCacheProcessor.java | 54 +-
.../distributed/dht/GridDhtCacheEntry.java | 5 +-
.../distributed/dht/GridDhtLocalPartition.java | 60 +-
.../GridDhtPartitionDemandMessage.java | 9 +-
.../preloader/GridDhtPartitionDemandPool.java | 1192 ----------------
.../dht/preloader/GridDhtPartitionDemander.java | 1310 ++++++++++++++++++
.../dht/preloader/GridDhtPartitionSupplier.java | 999 +++++++++++++
.../GridDhtPartitionSupplyMessageV2.java | 404 ++++++
.../preloader/GridDhtPartitionSupplyPool.java | 555 --------
.../dht/preloader/GridDhtPreloader.java | 269 +++-
.../datastructures/DataStructuresProcessor.java | 3 +
.../processors/task/GridTaskWorker.java | 4 +-
.../ignite/internal/util/lang/GridTuple4.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 2 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 3 +
.../GridCacheRebalancingAsyncSelfTest.java | 63 +
.../GridCacheRebalancingSyncSelfTest.java | 472 +++++++
.../GridCacheReplicatedPreloadSelfTest.java | 22 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 17 -
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 46 +
.../testframework/junits/GridAbstractTest.java | 3 +-
.../junits/common/GridCommonAbstractTest.java | 5 +-
.../testsuites/IgniteCacheTestSuite3.java | 5 +
.../tcp/GridOrderedMessageCancelSelfTest.java | 18 +-
35 files changed, 3946 insertions(+), 1963 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 6ac2b64..4012792 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -73,6 +73,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default rebalance timeout (ms).*/
public static final long DFLT_REBALANCE_TIMEOUT = 10000;
+ /** Default rebalance batches count. */
+ public static final long DFLT_REBALANCE_BATCHES_COUNT = 2;
+
/** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
public static final long DFLT_REBALANCE_THROTTLE = 0;
@@ -256,6 +259,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Off-heap memory size. */
private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
+ /** Rebalance batches count. */
+ private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
/** */
private boolean swapEnabled = DFLT_SWAP_ENABLED;
@@ -396,6 +402,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
rebalanceDelay = cc.getRebalanceDelay();
rebalanceOrder = cc.getRebalanceOrder();
rebalancePoolSize = cc.getRebalanceThreadPoolSize();
+ rebalanceBatchesCount = cc.getRebalanceBatchesCount();
rebalanceTimeout = cc.getRebalanceTimeout();
rebalanceThrottle = cc.getRebalanceThrottle();
readFromBackup = cc.isReadFromBackup();
@@ -1033,10 +1040,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only.
* <p/>
* If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for
- * all caches with smaller rebalance order (except caches with rebalance order {@code 0}) will be completed.
+ * all caches with smaller rebalance order will be completed.
* <p/>
* Note that cache with order {@code 0} does not participate in ordering. This means that cache with
- * rebalance order {@code 1} will never wait for any other caches. All caches with order {@code 0} will
+ * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will
* be rebalanced right away concurrently with each other and ordered rebalance processes.
* <p/>
* If not set, cache order is 0, i.e. rebalancing is not ordered.
@@ -1079,7 +1086,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
- this.rebalanceBatchSize = rebalanceBatchSize;
+ this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
return this;
}
@@ -1269,11 +1276,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ @Deprecated
/**
- * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
- * may create more threads for rebalancing than specified here (but never less threads).
- * <p>
- * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @return Size of rebalancing thread pool.
*/
@@ -1281,9 +1286,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return rebalancePoolSize;
}
+ @Deprecated
/**
- * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
- * for rebalancing than specified here (but never less threads).
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @param rebalancePoolSize Size of rebalancing thread pool.
* @return {@code this} for chaining.
@@ -1773,6 +1778,33 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Gets number of batches generated by supply node at rebalancing start.
+ *
+ * @return batches count
+ */
+ public long getRebalanceBatchesCount() {
+ return rebalanceBatchesCount;
+ }
+
+ /**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Sets number of batches generated by supply node at rebalancing start.
+ *
+ * @param rebalanceBatchesCnt batches count.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+ this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+ return this;
+ }
+
+ /**
* Gets cache store session listener factories.
*
* @return Cache store session listener factories.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ecae356..26145e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,6 +149,9 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
+ /** Default limit of threads used at rebalance. */
+ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -354,6 +357,9 @@ public class IgniteConfiguration {
/** Client mode flag. */
private Boolean clientMode;
+ /** Rebalance thread pool size. */
+ private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
/** Transactions configuration. */
private TransactionConfiguration txCfg = new TransactionConfiguration();
@@ -518,6 +524,7 @@ public class IgniteConfiguration {
utilityCachePoolSize = cfg.getUtilityCacheThreadPoolSize();
waitForSegOnStart = cfg.isWaitForSegmentOnStart();
warmupClos = cfg.getWarmupClosure();
+ rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
}
/**
@@ -1331,6 +1338,27 @@ public class IgniteConfiguration {
}
/**
+ * Gets Max count of threads can be used at rebalancing.
+ * Minimum is 1.
+ * @return count.
+ */
+ public int getRebalanceThreadPoolSize(){
+ return Math.max(1, rebalanceThreadPoolSize);
+ }
+
+ /**
+ * Sets Max count of threads can be used at rebalancing.
+ * Minimum is 1.
+ * @param size Size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+ this.rebalanceThreadPoolSize = size;
+
+ return this;
+ }
+
+ /**
* Returns a collection of life-cycle beans. These beans will be automatically
* notified of grid life-cycle events. Use life-cycle beans whenever you
* want to perform certain logic before and after grid startup and stopping
@@ -2383,4 +2411,4 @@ public class IgniteConfiguration {
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..da8cf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -733,6 +733,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ackEnvironmentVariables();
ackCacheConfiguration();
ackP2pConfiguration();
+ ackRebalanceConfiguration();
// Run background network diagnostics.
GridDiagnostic.runBackgroundCheck(gridName, execSvc, log);
@@ -2135,6 +2136,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
*
*/
+ private void ackRebalanceConfiguration() throws IgniteCheckedException {
+ if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
+ throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " +
+ "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start.");
+ }
+
+ /**
+ *
+ */
private void ackCacheConfiguration() {
CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 02b28c5..7d2b2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2035,6 +2035,7 @@ public class IgnitionEx {
cache.setAffinity(new RendezvousAffinityFunction(false, 20));
cache.setNodeFilter(CacheConfiguration.ALL_NODES);
cache.setStartSize(300);
+ cache.setRebalanceOrder(-2);//Prior to other system caches.
return cache;
}
@@ -2055,6 +2056,7 @@ public class IgnitionEx {
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setAffinity(new RendezvousAffinityFunction(false, 100));
cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+ cache.setRebalanceOrder(-1);//Prior to user caches.
return cache;
}
@@ -2075,6 +2077,7 @@ public class IgnitionEx {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
+ ccfg.setRebalanceOrder(-1);//Prior to user caches.
if (cfg.getCacheMode() == PARTITIONED)
ccfg.setBackups(cfg.getBackups());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..ae8c753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -684,7 +685,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..112] - this
+ case 114:
+ msg = new GridDhtPartitionSupplyMessageV2();
+
+ break;
+
+ // [-3..114] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
@@ -722,4 +728,4 @@ public class GridIoMessageFactory implements MessageFactory {
CUSTOM.put(type, c);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 476a96c..1fe55d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -483,6 +484,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 114: {
+ GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+ U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..4e92ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
assert !hasValueUnlocked() : this;
- obsolete = markObsolete0(obsoleteVer, false);
+ obsolete = markObsolete0(obsoleteVer, false, null);
assert obsolete : this;
}
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
// If entry is still removed.
if (newVer == ver) {
- if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) {
+ if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used): " + this);
}
@@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
if ((!hasReaders() || readers)) {
// markObsolete will clear the value.
- if (!(marked = markObsolete0(ver, true))) {
+ if (!(marked = markObsolete0(ver, true, null))) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used): " + this);
@@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean obsolete;
synchronized (this) {
- obsolete = markObsolete0(ver, true);
+ obsolete = markObsolete0(ver, true, null);
}
if (obsolete)
@@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- obsolete = markObsolete0(ver, true);
+ obsolete = markObsolete0(ver, true, null);
}
}
}
@@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!this.ver.equals(ver))
return false;
- marked = markObsolete0(ver, true);
+ marked = markObsolete0(ver, true, null);
}
if (marked)
@@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
*
* @param ver Version.
* @param clear {@code True} to clear.
+ * @param extras Predefined extras.
* @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes.
*/
- protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) {
+ protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridCacheObsoleteEntryExtras extras) {
assert Thread.holdsLock(this);
GridCacheVersion obsoleteVer = obsoleteVersionExtras();
@@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (mvcc == null || mvcc.isEmpty(ver)) {
obsoleteVer = ver;
- obsoleteVersionExtras(obsoleteVer);
+ obsoleteVersionExtras(obsoleteVer, extras);
if (clear)
value(null);
@@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
if (checkExpired()) {
- rmv = markObsolete0(cctx.versions().next(this.ver), true);
+ rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
return null;
}
@@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else {
- if (markObsolete0(obsoleteVer, true))
+ if (markObsolete0(obsoleteVer, true, null))
obsolete = true; // Success, will return "true".
}
}
@@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject prev = saveOldValueUnlocked(false);
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject prevVal = saveValueForIndexUnlocked();
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (swap) {
if (!isStartVersion()) {
try {
@@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheBatchSwapEntry ret = null;
try {
- if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+ if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
if (!isStartVersion() && hasValueUnlocked()) {
if (cctx.offheapTiered() && hasOffHeapPointer()) {
if (cctx.swap().offheapEvictionEnabled())
@@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return false;
if (checkExpired()) {
- rmv = markObsolete0(cctx.versions().next(this.ver), true);
+ rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
return false;
}
@@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param obsoleteVer Obsolete version.
*/
- protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer) {
- extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : obsoleteVer != null ?
- new GridCacheObsoleteEntryExtras(obsoleteVer) : null;
+ protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+ extras = (extras != null) ?
+ extras.obsoleteVersion(obsoleteVer) :
+ obsoleteVer != null ?
+ (ext != null) ? ext : new GridCacheObsoleteEntryExtras(obsoleteVer) :
+ null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..6793f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Queue;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +52,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -65,8 +70,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -75,6 +82,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
@@ -85,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -132,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private GridFutureAdapter<?> reconnectExchangeFut;
+ /** */
+ private final Queue<Callable<Boolean>> rebalancingQueue = new ConcurrentLinkedDeque8<>();
+
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -309,6 +321,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchWorker.futQ.addFirst(fut);
+ if (!cctx.kernalContext().clientNode()) {
+
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
+ @Override public void apply(final UUID id, final GridCacheMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ if (m instanceof GridDhtPartitionSupplyMessageV2)
+ cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+ idx, id, (GridDhtPartitionSupplyMessageV2)m);
+ else if (m instanceof GridDhtPartitionDemandMessage)
+ cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+ idx, id, (GridDhtPartitionDemandMessage)m);
+ else
+ log.error("Unsupported message type: " + m.getClass().getName());
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
if (reconnect) {
@@ -368,6 +408,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object rebalanceTopic(int idx) {
+ return TOPIC_CACHE.topic("Rebalance", idx);
+ }
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -392,6 +440,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(stopErr);
+ if (!cctx.kernalContext().clientNode())
+ for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+ cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+ }
+
U.cancel(exchWorker);
if (log.isDebugEnabled())
@@ -1103,9 +1156,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean startEvtFired = false;
+ int cnt = 0;
+
+ IgniteInternalFuture asyncStartFut = null;
+
while (!isCancelled()) {
GridDhtPartitionsExchangeFuture exchFut = null;
+ cnt++;
+
try {
boolean preloadFinished = true;
@@ -1220,12 +1279,106 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (assignsMap != null) {
+ rebalancingQueue.clear();
+
+ NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
int cacheId = e.getKey();
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- cacheCtx.preloader().addAssignments(e.getValue(), forcePreload);
+ int order = cacheCtx.config().getRebalanceOrder();
+
+ if (orderMap.get(order) == null)
+ orderMap.put(order, new LinkedList<Integer>());
+
+ orderMap.get(order).add(cacheId);
+ }
+
+ Callable<Boolean> marsR = null;
+
+ //Ordered rebalance scheduling.
+ for (Integer order : orderMap.keySet()) {
+ for (Integer cacheId : orderMap.get(order)) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ List<String> waitList = new LinkedList<>();
+
+ for (List<Integer> cIds : orderMap.headMap(order).values()) {
+ for (Integer cId : cIds) {
+ waitList.add(cctx.cacheContext(cId).name());
+ }
+ }
+
+ Callable<Boolean> r = cacheCtx.preloader().addAssignments(
+ assignsMap.get(cacheId), forcePreload, waitList, cnt);
+
+ if (r != null) {
+ U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
+ ", waitList=" + waitList.toString() + "]");
+
+ if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+ marsR = r;
+ else
+ rebalancingQueue.add(r);
+ }
+ }
+ }
+
+ if (asyncStartFut != null)
+ asyncStartFut.get(); // Wait for thread stop.
+
+ if (marsR != null || !rebalancingQueue.isEmpty()) {
+ if (futQ.isEmpty()) {
+ U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]");
+
+ if (marsR != null)
+ try {
+ marsR.call();//Marshaller cache rebalancing launches in sync way.
+ }
+ catch (Exception ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send initial demand request to node");
+
+ continue;
+ }
+
+ final GridFutureAdapter fut = new GridFutureAdapter();
+
+ asyncStartFut = fut;
+
+ cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ try {
+ while (true) {
+ Callable<Boolean> r = rebalancingQueue.poll();
+
+ if (r == null)
+ return false;
+
+ if (!r.call())
+ return false;
+ }
+ }
+ catch (Exception ex) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send initial demand request to node");
+
+ return false;
+ }
+ finally {
+ fut.onDone();
+ }
+ }
+ }, /*system pool*/ true);
+ }
+ else {
+ U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
+ }
+ }
+ else {
+ U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 755958e..79861a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -18,9 +18,14 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +95,11 @@ public interface GridCachePreloader {
*
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
+ * @param caches Rebalancing of these caches will be finished before this started.
+ * @param cnt Counter.
*/
- public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+ public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+ Collection<String> caches, int cnt);
/**
* @param p Preload predicate.
@@ -115,6 +123,11 @@ public interface GridCachePreloader {
public IgniteInternalFuture<?> syncFuture();
/**
+ * @return Future which will complete when preloading is finished on current topology.
+ */
+ public IgniteInternalFuture<Boolean> rebalanceFuture();
+
+ /**
* Requests that preloader sends the request for the key.
*
* @param keys Keys to request.
@@ -132,4 +145,30 @@ public interface GridCachePreloader {
* Unwinds undeploys.
*/
public void unwindUndeploys();
-}
\ No newline at end of file
+
+
+ /**
+ * Handles Supply message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param s Supply message.
+ */
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+ /**
+ * Handles Demand message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param d Demand message.
+ */
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d);
+
+ /**
+ * Evicts partition asynchronously.
+ *
+ * @param part Partition.
+ */
+ public void evictPartitionAsync(GridDhtLocalPartition part);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 5405449..b784383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -18,11 +18,16 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -36,7 +41,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
/** Cache context. */
protected final GridCacheContext<?, ?> cctx;
- /** Logger.*/
+ /** Logger. */
protected final IgniteLogger log;
/** Affinity. */
@@ -113,12 +118,28 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+ return new GridFinishedFuture<>(true);
+ }
+
+ /** {@inheritDoc} */
@Override public void unwindUndeploys() {
cctx.deploy().unwind(cctx);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
@@ -143,7 +164,13 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+ @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+ Collection<String> caches, int cnt) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
// No-op.
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 722e570..c2acd99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -31,9 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -99,7 +97,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -162,12 +159,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Map of proxies. */
private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
- /** Map of preload finish futures grouped by preload order. */
- private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
-
- /** Maximum detected rebalance order. */
- private int maxRebalanceOrder;
-
/** Caches stop sequence. */
private final Deque<String> stopSeq;
@@ -209,7 +200,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
caches = new ConcurrentHashMap<>();
jCacheProxies = new ConcurrentHashMap<>();
- preloadFuts = new TreeMap<>();
stopSeq = new LinkedList<>();
}
@@ -392,10 +382,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
U.maskName(cc.getName()) + ']');
- if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
- assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0");
+ if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
- }
if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
@@ -614,8 +602,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"Deployment mode for cache is not CONTINUOUS or SHARED.");
}
- maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());
-
ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
new CustomEventListener<DynamicCacheChangeBatch>() {
@Override public void onCustomEvent(ClusterNode snd,
@@ -846,31 +832,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
mgr.onKernalStart(false);
- for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
- GridCacheAdapter cache = e.getValue();
-
- if (maxRebalanceOrder > 0) {
- CacheConfiguration cfg = cache.configuration();
-
- int order = cfg.getRebalanceOrder();
-
- if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) {
- GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order);
-
- if (fut == null) {
- fut = new GridCompoundFuture<>();
-
- preloadFuts.put(order, fut);
- }
-
- fut.add(cache.preloader().syncFuture());
- }
- }
- }
-
- for (IgniteInternalFuture<?> fut : preloadFuts.values())
- ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
for (GridCacheAdapter<?, ?> cache : caches.values())
onKernalStart(cache);
@@ -2779,19 +2740,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future
- * with maximum order less than {@code order}.
- *
- * @param order Cache order.
- * @return Compound preload future or {@code null} if order is minimal order found.
- */
- @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) {
- Map.Entry<Integer, IgniteInternalFuture<?>> entry = preloadFuts.lowerEntry(order);
-
- return entry == null ? null : entry.getValue();
- }
-
- /**
* @param spaceName Space name.
* @param keyBytes Key bytes.
* @param valBytes Value bytes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..b2279ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @return {@code True} if entry was not being used, passed the filter and could be removed.
* @throws IgniteCheckedException If failed to remove from swap.
*/
- public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException {
+ public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
boolean rmv = false;
try {
@@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
// Call markObsolete0 to avoid recursive calls to clear if
// we are clearing dht local partition (onMarkedObsolete should not be called).
- if (!markObsolete0(ver, false)) {
+ if (!markObsolete0(ver, false, extras)) {
if (log.isDebugEnabled())
log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4f124e6..b3c13a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,17 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,10 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridCircularBuffer;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -38,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -46,18 +56,6 @@ import org.jetbrains.annotations.NotNull;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.LongAdder8;
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -286,7 +284,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
// Attempt to evict.
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
}
/**
@@ -411,7 +409,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
// Decrement reservations.
if (state.compareAndSet(s, s, reservations, --reservations)) {
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
break;
}
@@ -479,7 +477,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @param updateSeq Update sequence.
* @return Future for evict attempt.
*/
- IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+ void tryEvictAsync(boolean updateSeq) {
if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
state.compareAndSet(RENTING, EVICTED, 0, 0)) {
if (log.isDebugEnabled())
@@ -497,15 +495,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
clearDeferredDeletes();
-
- return new GridFinishedFuture<>(true);
}
-
- return cctx.closures().callLocalSafe(new GPC<Boolean>() {
- @Override public Boolean call() {
- return tryEvict(true);
- }
- }, /*system pool*/ true);
+ else {
+ cctx.preloader().evictPartitionAsync(this);
+ }
}
/**
@@ -521,12 +514,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
- * @param updateSeq Update sequence.
* @return {@code True} if entry has been transitioned to state EVICTED.
*/
- boolean tryEvict(boolean updateSeq) {
+ public void tryEvict() {
if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
- return false;
+ return;
// Attempt to evict partition entries from cache.
clearAll();
@@ -545,14 +537,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
rent.onDone();
- ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+ ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true);
clearDeferredDeletes();
-
- return true;
}
-
- return false;
}
/**
@@ -592,7 +580,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*
*/
void onUnlock() {
- tryEvict(true);
+ cctx.preloader().evictPartitionAsync(this);
}
/**
@@ -632,6 +620,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
it = F.concat(it, unswapIt);
}
+ GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
+
try {
while (it.hasNext()) {
GridDhtCacheEntry cached = null;
@@ -639,7 +629,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
try {
cached = it.next();
- if (cached.clearInternal(clearVer, swap)) {
+ if (cached.clearInternal(clearVer, swap, extras)) {
map.remove(cached.key(), cached);
if (!cached.isInternal()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 848ad87..885b0dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -116,6 +116,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
}
/**
+ * @param updateSeq Update sequence.
+ */
+ void updateSequence(long updateSeq) {
+ this.updateSeq = updateSeq;
+ }
+
+ /**
* @return Update sequence.
*/
long updateSequence() {
@@ -320,7 +327,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
+ return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
super.toString());
}
}
[4/5] ignite git commit: Ignite-1093
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index e993a88..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
- /** Dummy message to wake up a blocking queue if a node leaves. */
- private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- @GridToStringInclude
- private final Collection<DemandWorker> dmdWorkers;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
- @GridToStringInclude
- private SyncFuture syncFut;
-
- /** Preload timeout. */
- private final AtomicLong timeout;
-
- /** Allows demand threads to synchronize their step. */
- private CyclicBarrier barrier;
-
- /** Demand lock. */
- private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
- /** */
- private int poolSize;
-
- /** Last timeout object. */
- private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
- /** Last exchange future. */
- private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
- poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- if (enabled) {
- barrier = new CyclicBarrier(poolSize);
-
- dmdWorkers = new ArrayList<>(poolSize);
-
- for (int i = 0; i < poolSize; i++)
- dmdWorkers.add(new DemandWorker(i));
-
- syncFut = new SyncFuture(dmdWorkers);
- }
- else {
- dmdWorkers = Collections.emptyList();
-
- syncFut = new SyncFuture(dmdWorkers);
-
- // Calling onDone() immediately since preloading is disabled.
- syncFut.onDone();
- }
-
- timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
- }
-
- /**
- *
- */
- void start() {
- if (poolSize > 0) {
- for (DemandWorker w : dmdWorkers)
- new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
- }
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(dmdWorkers);
-
- if (log.isDebugEnabled())
- log.debug("Before joining on demand workers: " + dmdWorkers);
-
- U.join(dmdWorkers, log);
-
- if (log.isDebugEnabled())
- log.debug("After joining on demand workers: " + dmdWorkers);
-
- lastExchangeFut = null;
-
- lastTimeoutObj.set(null);
- }
-
- /**
- * @return Future for {@link CacheRebalanceMode#SYNC} mode.
- */
- IgniteInternalFuture<?> syncFuture() {
- return syncFut;
- }
-
- /**
- * Sets preload predicate for demand pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return poolSize;
- }
-
- /**
- * Wakes up demand workers when new exchange future was added.
- */
- void onExchangeFutureAdded() {
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers)
- w.addMessage(DUMMY_TOP);
- }
- }
-
- /**
- * Force preload.
- */
- void forcePreload() {
- GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- if (exchFut != null) {
- if (log.isDebugEnabled())
- log.debug("Forcing rebalance event for future: " + exchFut);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- else if (log.isDebugEnabled())
- log.debug("Ignoring force rebalance request (no topology event happened yet).");
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int type, DiscoveryEvent discoEvt) {
- preloadEvent(-1, type, discoEvt);
- }
-
- /**
- * @param part Partition.
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
- assert discoEvt != null;
-
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
- }
-
- /**
- * @param msg Message to check.
- * @return {@code True} if dummy message.
- */
- private boolean dummyTopology(SupplyMessage msg) {
- return msg == DUMMY_TOP;
- }
-
- /**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Picked owners.
- */
- private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
- int affCnt = affNodes.size();
-
- Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
- int rmtCnt = rmts.size();
-
- if (rmtCnt <= affCnt)
- return rmts;
-
- List<ClusterNode> sorted = new ArrayList<>(rmts);
-
- // Sort in descending order, so nodes with higher order will be first.
- Collections.sort(sorted, CU.nodeComparator(false));
-
- // Pick newest nodes.
- return sorted.subList(0, affCnt);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Nodes owning this partition.
- */
- private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
- }
-
- /**
- * @param assigns Assignments.
- * @param force {@code True} if dummy reassign.
- */
- void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
- if (log.isDebugEnabled())
- log.debug("Adding partition assignments: " + assigns);
-
- long delay = cctx.config().getRebalanceDelay();
-
- if (delay == 0 || force) {
- assert assigns != null;
-
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers) {
- w.addAssignments(assigns);
-
- w.addMessage(DUMMY_TOP);
- }
- }
- }
- else if (delay > 0) {
- assert !force;
-
- GridTimeoutObject obj = lastTimeoutObj.get();
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- assert exchFut != null : "Delaying rebalance process without topology event.";
-
- obj = new GridTimeoutObjectAdapter(delay) {
- @Override public void onTimeout() {
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- };
-
- lastTimeoutObj.set(obj);
-
- cctx.time().addTimeoutObject(obj);
- }
- }
-
- /**
- *
- */
- void unwindUndeploys() {
- demandLock.writeLock().lock();
-
- try {
- cctx.deploy().unwind(cctx);
- }
- finally {
- demandLock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionDemandPool.class, this);
- }
-
- /**
- *
- */
- private class DemandWorker extends GridWorker {
- /** Worker ID. */
- private int id;
-
- /** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
- /** Message queue. */
- private final LinkedBlockingDeque<SupplyMessage> msgQ =
- new LinkedBlockingDeque<>();
-
- /** Counter. */
- private long cntr;
-
- /** Hide worker logger and use cache logger instead. */
- private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
- /**
- * @param id Worker ID.
- */
- private DemandWorker(int id) {
- super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
- assert id >= 0;
-
- this.id = id;
- }
-
- /**
- * @param assigns Assignments.
- */
- void addAssignments(GridDhtPreloaderAssignments assigns) {
- assert assigns != null;
-
- assignQ.offer(assigns);
-
- if (log.isDebugEnabled())
- log.debug("Added assignments to worker: " + this);
- }
-
- /**
- * @return {@code True} if topology changed.
- */
- private boolean topologyChanged() {
- return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
- }
-
- /**
- * @param msg Message.
- */
- private void addMessage(SupplyMessage msg) {
- if (!enterBusy())
- return;
-
- try {
- assert dummyTopology(msg) || msg.supply().workerId() == id;
-
- msgQ.offer(msg);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * @param timeout Timed out value.
- */
- private void growTimeout(long timeout) {
- long newTimeout = (long)(timeout * 1.5D);
-
- // Account for overflow.
- if (newTimeout < 0)
- newTimeout = Long.MAX_VALUE;
-
- // Grow by 50% only if another thread didn't do it already.
- if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
- U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
- newTimeout + "ms.");
- }
-
- /**
- * @param pick Node picked for preloading.
- * @param p Partition.
- * @param entry Preloaded entry.
- * @param topVer Topology version.
- * @return {@code False} if partition has become invalid during preloading.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private boolean preloadEntry(
- ClusterNode pick,
- int p,
- GridCacheEntryInfo entry,
- AffinityTopologyVersion topVer
- ) throws IgniteCheckedException {
- try {
- GridCacheEntryEx cached = null;
-
- try {
- cached = cctx.dht().entryEx(entry.key());
-
- if (log.isDebugEnabled())
- log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
- if (cctx.dht().isIgfsDataCache() &&
- cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
- LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
- "value, will ignore rebalance entries): " + name());
-
- if (cached.markObsoleteIfEmpty(null))
- cached.context().cache().removeIfObsolete(cached.key());
-
- return true;
- }
-
- if (preloadPred == null || preloadPred.apply(entry)) {
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
- )) {
- cctx.evicts().touch(cached, topVer); // Start tracking.
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
- cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
- (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
- false, null, null, null);
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
- cached.key() + ", part=" + p + ']');
- }
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
- return false;
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
- }
-
- return true;
- }
-
- /**
- * @param idx Unique index for this topic.
- * @return Topic for partition.
- */
- public Object topic(long idx) {
- return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
- }
-
- /**
- * @param node Node to demand from.
- * @param topVer Topology version.
- * @param d Demand message.
- * @param exchFut Exchange future.
- * @return Missed partitions.
- * @throws InterruptedException If interrupted.
- * @throws ClusterTopologyCheckedException If node left.
- * @throws IgniteCheckedException If failed to send message.
- */
- private Set<Integer> demandFromNode(
- ClusterNode node,
- final AffinityTopologyVersion topVer,
- GridDhtPartitionDemandMessage d,
- GridDhtPartitionsExchangeFuture exchFut
- ) throws InterruptedException, IgniteCheckedException {
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- cntr++;
-
- d.topic(topic(cntr));
- d.workerId(id);
-
- Set<Integer> missed = new HashSet<>();
-
- // Get the same collection that will be sent in the message.
- Collection<Integer> remaining = d.partitions();
-
- // Drain queue before processing a new node.
- drainQueue();
-
- if (isCancelled() || topologyChanged())
- return missed;
-
- cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- try {
- boolean retry;
-
- // DoWhile.
- // =======
- do {
- retry = false;
-
- // Create copy.
- d = new GridDhtPartitionDemandMessage(d, remaining);
-
- long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
- d.timeout(timeout);
-
- if (log.isDebugEnabled())
- log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
- // Send demand message.
- cctx.io().send(node, d, cctx.ioPolicy());
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged()) {
- SupplyMessage s = poll(msgQ, timeout, this);
-
- // If timed out.
- if (s == null) {
- if (msgQ.isEmpty()) { // Safety check.
- U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
- " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
- " configuration properties).");
-
- growTimeout(timeout);
-
- // Ordered listener was removed if timeout expired.
- cctx.io().removeOrderedHandler(d.topic());
-
- // Must create copy to be able to work with IO manager thread local caches.
- d = new GridDhtPartitionDemandMessage(d, remaining);
-
- // Create new topic.
- d.topic(topic(++cntr));
-
- // Create new ordered listener.
- cctx.io().addOrderedHandler(d.topic(),
- new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId,
- GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- // Resend message with larger timeout.
- retry = true;
-
- break; // While.
- }
- else
- continue; // While.
- }
-
- // If topology changed.
- if (dummyTopology(s)) {
- if (topologyChanged())
- break; // While.
- else
- continue; // While.
- }
-
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
- continue; // While.
- }
-
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
-
- GridDhtPartitionSupplyMessage supply = s.supply();
-
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
-
- retry = true;
-
- // Quit preloading.
- break;
- }
-
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- int p = e.getKey();
-
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
-
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
-
- assert reserved : "Failed to reserve partition [gridName=" +
- cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
- part.lock();
-
- try {
- Collection<Integer> invalidParts = new GridLeanSet<>();
-
- // Loop through all received entries and try to preload them.
- for (GridCacheEntryInfo entry : e.getValue().infos()) {
- if (!invalidParts.contains(p)) {
- if (!part.preloadingPermitted(entry.key(), entry.version())) {
- if (log.isDebugEnabled())
- log.debug("Preloading is not permitted for entry due to " +
- "evictions [key=" + entry.key() +
- ", ver=" + entry.version() + ']');
-
- continue;
- }
-
- if (!preloadEntry(node, p, entry, topVer)) {
- invalidParts.add(p);
-
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- }
- }
- }
-
- boolean last = supply.last().contains(p);
-
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- remaining.remove(p);
-
- top.own(part);
-
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " + part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
- }
- }
- finally {
- part.unlock();
- part.release();
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
- }
- }
-
- remaining.removeAll(s.supply().missed());
-
- // Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed())
- if (cctx.affinity().localNode(miss, topVer))
- missed.add(miss);
-
- if (remaining.isEmpty())
- break; // While.
-
- if (s.supply().ack()) {
- retry = true;
-
- break;
- }
- }
- }
- while (retry && !isCancelled() && !topologyChanged());
-
- return missed;
- }
- finally {
- cctx.io().removeOrderedHandler(d.topic());
- }
- }
-
- /**
- * @throws InterruptedException If interrupted.
- */
- private void drainQueue() throws InterruptedException {
- while (!msgQ.isEmpty()) {
- SupplyMessage msg = msgQ.take();
-
- if (log.isDebugEnabled())
- log.debug("Drained supply message: " + msg);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- try {
- int rebalanceOrder = cctx.config().getRebalanceOrder();
-
- if (!CU.isMarshallerCache(cctx.name())) {
- if (log.isDebugEnabled())
- log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
- try {
- cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
- }
- }
-
- if (rebalanceOrder > 0) {
- IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
- try {
- if (fut != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
- ", rebalanceOrder=" + rebalanceOrder + ']');
-
- fut.get();
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
- }
- }
-
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- boolean stopEvtFired = false;
-
- while (!isCancelled()) {
- try {
- barrier.await();
-
- if (id == 0 && exchFut != null && !exchFut.dummy() &&
- cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
- if (!cctx.isReplicated() || !stopEvtFired) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stopEvtFired = true;
- }
- }
- }
- catch (BrokenBarrierException ignore) {
- throw new InterruptedException("Demand worker stopped.");
- }
-
- // Sync up all demand threads at this step.
- GridDhtPreloaderAssignments assigns = null;
-
- while (assigns == null)
- assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
- demandLock.readLock().lock();
-
- try {
- exchFut = assigns.exchangeFuture();
-
- // Assignments are empty if preloading is disabled.
- if (assigns.isEmpty())
- continue;
-
- boolean resync = false;
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged() && !resync) {
- Collection<Integer> missed = new HashSet<>();
-
- // For.
- // ===
- for (ClusterNode node : assigns.keySet()) {
- if (topologyChanged() || isCancelled())
- break; // For.
-
- GridDhtPartitionDemandMessage d = assigns.remove(node);
-
- // If another thread is already processing this message,
- // move to the next node.
- if (d == null)
- continue; // For.
-
- try {
- Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
- if (!set.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
- set + ']');
-
- missed.addAll(set);
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
- ", msg=" + e.getMessage() + ']');
-
- resync = true;
-
- break; // For.
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
- }
- }
-
- // Processed missed entries.
- if (!missed.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + missed);
-
- assert exchFut.exchangeId() != null;
-
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
- }
- else
- break; // While.
- }
- }
- finally {
- demandLock.readLock().unlock();
-
- syncFut.onWorkerDone(this);
- }
-
- cctx.shared().exchange().scheduleResendPartitions();
- }
- }
- finally {
- // Safety.
- syncFut.onWorkerDone(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
- }
- }
-
- /**
- * Sets last exchange future.
- *
- * @param lastFut Last future to set.
- */
- void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- lastExchangeFut = lastFut;
- }
-
- /**
- * @param exchFut Exchange future.
- * @return Assignments of partitions to nodes.
- */
- GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- // No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- int partCnt = cctx.affinity().partitions();
-
- assert exchFut.forcePreload() || exchFut.dummyReassign() ||
- exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", topVer=" + top.topologyVersion() + ']';
-
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- AffinityTopologyVersion topVer = assigns.topologyVersion();
-
- for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
- if (log.isDebugEnabled())
- log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
- exchFut.exchangeId());
-
- break;
- }
-
- // If partition belongs to local node.
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
- assert part.id() == p;
-
- if (part.state() != MOVING) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
- continue; // For.
- }
-
- Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
- if (picked.isEmpty()) {
- top.own(part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
-
- if (log.isDebugEnabled())
- log.debug("Owning partition as there are no other owners: " + part);
- }
- else {
- ClusterNode n = F.first(picked);
-
- GridDhtPartitionDemandMessage msg = assigns.get(n);
-
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
- cctx.cacheId()));
- }
-
- msg.addPartition(p);
- }
- }
- }
-
- return assigns;
- }
-
- /**
- *
- */
- private class SyncFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Remaining workers. */
- private Collection<DemandWorker> remaining;
-
- /**
- * @param workers List of workers.
- */
- private SyncFuture(Collection<DemandWorker> workers) {
- assert workers.size() == poolSize();
-
- remaining = Collections.synchronizedList(new LinkedList<>(workers));
- }
-
- /**
- * @param w Worker who iterated through all partitions.
- */
- void onWorkerDone(DemandWorker w) {
- if (isDone())
- return;
-
- if (remaining.remove(w))
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
- if (remaining.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completed sync future.");
-
- onDone();
- }
- }
- }
-
- /**
- * Supply message wrapper.
- */
- private static class SupplyMessage {
- /** Sender ID. */
- private UUID sndId;
-
- /** Supply message. */
- private GridDhtPartitionSupplyMessage supply;
-
- /**
- * Dummy constructor.
- */
- private SupplyMessage() {
- // No-op.
- }
-
- /**
- * @param sndId Sender ID.
- * @param supply Supply message.
- */
- SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
- this.sndId = sndId;
- this.supply = supply;
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return sndId;
- }
-
- /**
- * @return Message.
- */
- GridDhtPartitionSupplyMessage supply() {
- return supply;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SupplyMessage.class, this);
- }
- }
-}
\ No newline at end of file
[2/5] ignite git commit: Ignite-1093
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..694088b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private GridDhtPartitionTopology top;
+
+ /** */
+ private final boolean depEnabled;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Supply context map. T2: nodeId, idx. */
+ private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap =
+ new ConcurrentHashMap8<>();
+
+ /** Rebalancing listener. */
+ private GridLocalEventListener lsnr;
+
+ /**
+ * @param cctx Cache context.
+ */
+ GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+ assert cctx != null;
+
+ this.cctx = cctx;
+
+ log = cctx.logger(getClass());
+
+ top = cctx.dht().topology();
+
+ depEnabled = cctx.gridDeploy().enabled();
+ }
+
+ /**
+ *
+ */
+ void start() {
+ lsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ if (evt instanceof DiscoveryEvent) {
+ for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
+ T2<UUID, Integer> t = entry.getKey();
+
+ if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
+ SupplyContext sctx = entry.getValue();
+
+ clearContext(sctx, log);
+
+ if (log.isDebugEnabled())
+ log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]");
+
+ scMap.remove(t, sctx);
+ }
+ }
+ }
+ else {
+ assert false;
+ }
+ }
+ };
+
+ cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ startOldListeners();
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ if (lsnr != null)
+ cctx.events().removeListener(lsnr);
+
+ for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
+ clearContext(entry.getValue(), log);
+ }
+
+ stopOldListeners();
+ }
+
+ /**
+ * Clear context.
+ *
+ * @param sc Supply context.
+ * @param log Logger.
+ * @return true in case context was removed.
+ */
+ private static void clearContext(
+ final SupplyContext sc,
+ final IgniteLogger log) {
+ if (sc != null) {
+ final Iterator it = sc.entryIt;
+
+ if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
+ try {
+ synchronized (it) {
+ if (!((GridCloseableIterator)it).isClosed())
+ ((GridCloseableIterator)it).close();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Iterator close failed.", e);
+ }
+ }
+
+ final GridDhtLocalPartition loc = sc.loc;
+
+ if (loc != null && loc.reservations() > 0) {
+ synchronized (loc) {
+ if (loc.reservations() > 0)
+ loc.release();
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Sets preload predicate for supply pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * @param d Demand message.
+ * @param idx Index.
+ * @param id Node uuid.
+ */
+ public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+ assert d != null;
+ assert id != null;
+
+ AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion demTop = d.topologyVersion();
+
+ if (cutTop.compareTo(demTop) > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
+ ", from=" + id + ", idx=" + idx + "]");
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
+ ", from=" + id + ", idx=" + idx + "]");
+
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+ d.updateSequence(), cctx.cacheId(), d.topologyVersion());
+
+ ClusterNode node = cctx.discovery().node(id);
+
+ T2<UUID, Integer> scId = new T2<>(id, idx);
+
+ try {
+ SupplyContext sctx = scMap.remove(scId);
+
+ if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) {
+ clearContext(sctx, log);
+
+ sctx = null;
+ }
+
+ if (sctx == null && d.partitions() == null)
+ return;
+
+ assert !(sctx != null && d.partitions() != null);
+
+ long bCnt = 0;
+
+ int phase = 0;
+
+ boolean newReq = true;
+
+ long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+
+ if (sctx != null) {
+ phase = sctx.phase;
+
+ maxBatchesCnt = 1;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+ ", idx=" + idx + "]");
+ }
+
+ Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+ while ((sctx != null && newReq) || partIt.hasNext()) {
+ int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+ newReq = false;
+
+ GridDhtLocalPartition loc;
+
+ if (sctx != null && sctx.loc != null) {
+ loc = sctx.loc;
+
+ assert loc.reservations() > 0;
+ }
+ else {
+ loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ if (phase == 0)
+ phase = 1;
+
+ if (phase == 1) {
+ Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+ (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+ while (entIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ entIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ swapLsnr = null;
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ GridCacheEntryEx e = entIt.next();
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ }
+
+ if (phase == 1) {
+ phase = 2;
+
+ if (sctx != null) {
+ sctx = new SupplyContext(
+ phase,
+ partIt,
+ null,
+ swapLsnr,
+ part,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+ }
+ }
+
+ if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ sctx != null && sctx.entryIt != null ?
+ (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ boolean prepared = false;
+
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ iter,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ swapLsnr = null;
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ iter.close();
+
+ if (partMissing)
+ continue;
+ }
+ }
+
+ if (swapLsnr == null && sctx != null)
+ swapLsnr = sctx.swapLsnr;
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (phase == 2) {
+ phase = 3;
+
+ if (sctx != null) {
+ sctx = new SupplyContext(
+ phase,
+ partIt,
+ null,
+ null,
+ part,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+ }
+ }
+
+ if (phase == 3 && swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ?
+ (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+ while (lsnrIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId,
+ phase,
+ partIt,
+ part,
+ lsnrIt,
+ swapLsnr,
+ loc,
+ d.topologyVersion(),
+ d.updateSequence());
+
+ loc = null;
+
+ reply(node, d, s, scId);
+
+ return;
+ }
+ else {
+ if (!reply(node, d, s, scId))
+ return;
+
+ s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
+ }
+ }
+
+ GridCacheEntryInfo info = lsnrIt.next();
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ phase = 0;
+
+ sctx = null;
+ }
+ finally {
+ if (loc != null)
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ scMap.remove(scId);
+
+ reply(node, d, s, scId);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + node.id() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+ ", idx=" + idx + "]");
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + id, e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d DemandMessage
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reply(ClusterNode n,
+ GridDhtPartitionDemandMessage d,
+ GridDhtPartitionSupplyMessageV2 s,
+ T2<UUID, Integer> scId)
+ throws IgniteCheckedException {
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ // Throttle preloading.
+ if (cctx.config().getRebalanceThrottle() > 0)
+ U.sleep(cctx.config().getRebalanceThrottle());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ clearContext(scMap.remove(scId), log);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param t Tuple.
+ * @param phase Phase.
+ * @param partIt Partition it.
+ * @param part Partition.
+ * @param entryIt Entry it.
+ * @param swapLsnr Swap listener.
+ */
+ private void saveSupplyContext(
+ T2<UUID, Integer> t,
+ int phase,
+ Iterator<Integer> partIt,
+ int part,
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+ GridDhtLocalPartition loc,
+ AffinityTopologyVersion topVer,
+ long updateSeq) {
+ SupplyContext old = scMap.putIfAbsent(t,
+ new SupplyContext(phase,
+ partIt,
+ entryIt,
+ swapLsnr,
+ part,
+ loc,
+ topVer,
+ updateSeq));
+
+ assert old == null;
+ }
+
+ /**
+ * Supply context.
+ */
+ private static class SupplyContext {
+ /** Phase. */
+ private final int phase;
+
+ /** Partition iterator. */
+ private final Iterator<Integer> partIt;
+
+ /** Entry iterator. */
+ private final Iterator<?> entryIt;
+
+ /** Swap listener. */
+ private final GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+ /** Partition. */
+ private final int part;
+
+ /** Local partition. */
+ private final GridDhtLocalPartition loc;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Update seq. */
+ private final long updateSeq;
+
+ /**
+ * @param phase Phase.
+ * @param partIt Partition iterator.
+ * @param entryIt Entry iterator.
+ * @param swapLsnr Swap listener.
+ * @param part Partition.
+ */
+ public SupplyContext(int phase,
+ Iterator<Integer> partIt,
+ Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr,
+ int part,
+ GridDhtLocalPartition loc,
+ AffinityTopologyVersion topVer,
+ long updateSeq) {
+ this.phase = phase;
+ this.partIt = partIt;
+ this.entryIt = entryIt;
+ this.swapLsnr = swapLsnr;
+ this.part = part;
+ this.loc = loc;
+ this.topVer = topVer;
+ this.updateSeq = updateSeq;
+ }
+ }
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void startOldListeners() {
+ if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processOldDemandMessage(m, id);
+ }
+ });
+ }
+ }
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void stopOldListeners() {
+ if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+
+ cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
+ }
+ }
+
+ /**
+ * @param d D.
+ * @param id Id.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+
+ ClusterNode node = cctx.node(id);
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ boolean ack = false;
+
+ try {
+ for (int part : d.partitions()) {
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ for (GridCacheEntryEx e : loc.entries()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ if (cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+ }
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ for (GridCacheEntryInfo info : entries) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ if (ack) {
+ s.markAck();
+
+ break; // Partition for loop.
+ }
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ replyOld(node, d, s);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d Demand message.
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..bb89a42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Update sequence. */
+ private long updateSeq;
+
+ /** Acknowledgement flag. */
+ private boolean ack;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Partitions that have been fully sent. */
+ @GridDirectCollection(int.class)
+ private Collection<Integer> last;
+
+ /** Partitions which were not found. */
+ @GridToStringInclude
+ @GridDirectCollection(int.class)
+ private Collection<Integer> missed;
+
+ /** Entries. */
+ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+ private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+ /** Message size. */
+ @GridDirectTransient
+ private int msgSize;
+
+ /**
+ * @param updateSeq Update sequence for this node.
+ * @param cacheId Cache ID.
+ */
+ GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+ assert updateSeq > 0;
+
+ this.cacheId = cacheId;
+ this.updateSeq = updateSeq;
+ this.topVer = topVer;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtPartitionSupplyMessageV2() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ignoreClassErrors() {
+ return true;
+ }
+
+ /**
+ * @return Update sequence.
+ */
+ long updateSequence() {
+ return updateSeq;
+ }
+
+ /**
+ * Marks this message for acknowledgment.
+ */
+ void markAck() {
+ ack = true;
+ }
+
+ /**
+ * @return Acknowledgement flag.
+ */
+ boolean ack() {
+ return ack;
+ }
+
+ /**
+ * @return Topology version for which demand message is sent.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Flag to indicate last message for partition.
+ */
+ Collection<Integer> last() {
+ return last == null ? Collections.<Integer>emptySet() : last;
+ }
+
+ /**
+ * @param p Partition which was fully sent.
+ */
+ void last(int p) {
+ if (last == null)
+ last = new HashSet<>();
+
+ if (last.add(p)) {
+ msgSize += 4;
+
+ // If partition is empty, we need to add it.
+ if (!infos.containsKey(p)) {
+ CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+ infoCol.init();
+
+ infos.put(p, infoCol);
+ }
+ }
+ }
+
+ /**
+ * @param p Missed partition.
+ */
+ void missed(int p) {
+ if (missed == null)
+ missed = new HashSet<>();
+
+ if (missed.add(p))
+ msgSize += 4;
+ }
+
+ /**
+ * @return Missed partitions.
+ */
+ Collection<Integer> missed() {
+ return missed == null ? Collections.<Integer>emptySet() : missed;
+ }
+
+ /**
+ * @return Entries.
+ */
+ Map<Integer, CacheEntryInfoCollection> infos() {
+ return infos;
+ }
+
+ /**
+ * @return Message size.
+ */
+ int messageSize() {
+ return msgSize;
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+ assert (info.key() != null || info.keyBytes() != null);
+ assert info.value() != null;
+
+ // Need to call this method to initialize info properly.
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryInfoCollection col : infos().values()) {
+ List<GridCacheEntryInfo> entries = col.infos();
+
+ for (int i = 0; i < entries.size(); i++)
+ entries.get(i).unmarshal(cacheCtx, ldr);
+ }
+ }
+
+ /**
+ * @return Number of entries in message.
+ */
+ public int size() {
+ return infos.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("ack", ack))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeLong("updateSeq", updateSeq))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ ack = reader.readBoolean("ack");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ updateSeq = reader.readLong("updateSeq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 114;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 9;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+ "size", size(),
+ "parts", infos.keySet(),
+ "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index fe328ef..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- private GridDhtPartitionTopology top;
-
- /** */
- private final Collection<SupplyWorker> workers = new LinkedList<>();
-
- /** */
- private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
- /** */
- private final boolean depEnabled;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- top = cctx.dht().topology();
-
- if (!cctx.kernalContext().clientNode()) {
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
-
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
- }
-
- depEnabled = cctx.gridDeploy().enabled();
- }
-
- /**
- *
- */
- void start() {
- for (SupplyWorker w : workers)
- new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(workers);
- U.join(workers, log);
-
- top = null;
- }
-
- /**
- * Sets preload predicate for supply pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return cctx.config().getRebalanceThreadPoolSize();
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param d Message.
- */
- private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
- if (!enterBusy())
- return;
-
- try {
- if (cctx.rebalanceEnabled()) {
- if (log.isDebugEnabled())
- log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
- queue.offer(new DemandMessage(nodeId, d));
- }
- else
- U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param deque Deque to poll from.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(2000, MILLISECONDS);
- }
-
- /**
- * Supply work.
- */
- private class SupplyWorker extends GridWorker {
- /** Hide worker logger and use cache logger. */
- private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
- /**
- * Default constructor.
- */
- private SupplyWorker() {
- super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- DemandMessage msg = poll(queue, this);
-
- if (msg == null)
- continue;
-
- ClusterNode node = cctx.discovery().node(msg.senderId());
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Received message from non-existing node (will ignore): " + msg);
-
- continue;
- }
-
- processMessage(msg, node);
- }
- }
-
- /**
- * @param msg Message.
- * @param node Demander.
- */
- private void processMessage(DemandMessage msg, ClusterNode node) {
- assert msg != null;
- assert node != null;
-
- GridDhtPartitionDemandMessage d = msg.message();
-
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- boolean ack = false;
-
- try {
- for (int part : d.partitions()) {
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + msg.senderId() + ']');
-
- continue;
- }
-
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
-
- boolean partMissing = false;
-
- for (GridCacheEntryEx e : loc.entries()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
- }
-
- GridCacheEntryInfo info = e.info();
-
- if (info != null && !info.isNew()) {
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- if (partMissing)
- continue;
-
- if (cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
- cctx.swap().iterator(part);
-
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
-
- for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- partMissing = true;
-
- break; // For.
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
- }
-
- GridCacheSwapEntry swapEntry = e.getValue();
-
- GridCacheEntryInfo info = new GridCacheEntryInfo();
-
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
- }
-
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
-
- if (ldr == null)
- continue;
-
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
-
- prepared = true;
- }
- }
- }
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
- }
- }
- }
-
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
-
- if (swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
- swapLsnr = null;
-
- for (GridCacheEntryInfo info : entries) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
- // No need to continue iteration over swap entries.
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
- if (!reply(node, d, s))
- return;
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(),
- cctx.cacheId());
- }
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- // Mark as last supply message.
- s.last(part);
-
- if (ack) {
- s.markAck();
-
- break; // Partition for loop.
- }
- }
- finally {
- loc.release();
-
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
- }
- }
-
- reply(node, d, s);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
- }
- }
-
- /**
- * @param n Node.
- * @param d Demand message.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
- return false;
- }
- }
- }
-
- /**
- * Demand message wrapper.
- */
- private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param sndId Sender ID.
- * @param msg Message.
- */
- DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
- super(sndId, msg);
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public DemandMessage() {
- // No-op.
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return get1();
- }
-
- /**
- * @return Message.
- */
- public GridDhtPartitionDemandMessage message() {
- return get2();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
- }
- }
-}
\ No newline at end of file
[3/5] ignite git commit: Ignite-1093
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..6479542
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridLeanSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+ @GridToStringInclude
+ private final GridFutureAdapter syncFut = new GridFutureAdapter();
+
+ /** Rebalance future. */
+ @GridToStringInclude
+ private volatile RebalanceFuture rebalanceFut;
+
+ /** Last timeout object. */
+ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+ /** Last exchange future. */
+ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+ /** Demand lock. */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private final ReadWriteLock demandLock;
+
+ /**
+ * @param cctx Cctx.
+ * @param demandLock Demand lock.
+ */
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+ assert cctx != null;
+
+ this.cctx = cctx;
+ this.demandLock = demandLock;
+
+ log = cctx.logger(getClass());
+
+ boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+ rebalanceFut = new RebalanceFuture();//Dummy.
+
+ if (!enabled) {
+ // Calling onDone() immediately since preloading is disabled.
+ rebalanceFut.onDone(true);
+ syncFut.onDone();
+ }
+ }
+
+ /**
+ * Start.
+ */
+ void start() {
+ }
+
+ /**
+ * Stop.
+ */
+ void stop() {
+ lastExchangeFut = null;
+
+ lastTimeoutObj.set(null);
+ }
+
+ /**
+ * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+ */
+ IgniteInternalFuture<?> syncFuture() {
+ return syncFut;
+ }
+
+ /**
+ * @return Rebalance future.
+ */
+ IgniteInternalFuture<Boolean> rebalanceFuture() {
+ return rebalanceFut;
+ }
+
+ /**
+ * Sets preload predicate for demand pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * Force preload.
+ */
+ void forcePreload() {
+ GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ if (exchFut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing rebalance event for future: " + exchFut);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring force rebalance request (no topology event happened yet).");
+ }
+
+ /**
+ * @param fut Future.
+ * @return {@code True} if topology changed.
+ */
+ private boolean topologyChanged(RebalanceFuture fut) {
+ return
+ !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+ fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
+ }
+
+ /**
+ * @param part Partition.
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+ assert discoEvt != null;
+
+ cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ }
+
+ /**
+ * @param name Cache name.
+ * @param fut Future.
+ */
+ private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']');
+
+ RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
+
+ if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
+ if (!wFut.get()) {
+ U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+ "] (cache rebalanced with missed partitions)");
+
+ return false;
+ }
+
+ return true;
+ }
+ else {
+ U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+ "] (topology already changed)");
+
+ return false;
+ }
+ }
+
+ /**
+ * @param assigns Assignments.
+ * @param force {@code True} if dummy reassign.
+ * @param caches Rebalancing of these caches will be finished before this started.
+ * @param cnt Counter.
+ * @throws IgniteCheckedException Exception
+ */
+ Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force,
+ final Collection<String> caches, int cnt) {
+ if (log.isDebugEnabled())
+ log.debug("Adding partition assignments: " + assigns);
+
+ long delay = cctx.config().getRebalanceDelay();
+
+ if (delay == 0 || force) {
+ assert assigns != null;
+
+ final RebalanceFuture oldFut = rebalanceFut;
+
+ final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt);
+
+ if (!oldFut.isInitial())
+ oldFut.cancel();
+ else
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> future) {
+ oldFut.onDone(fut.result());
+ }
+ });
+
+ rebalanceFut = fut;
+
+ if (assigns.isEmpty()) {
+ fut.doneIfEmpty();
+
+ return null;
+ }
+
+ return new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ for (String c : caches) {
+ if (!waitForCacheRebalancing(c, fut))
+ return false;
+ }
+
+ return requestPartitions(fut, assigns);
+ }
+ };
+ }
+ else if (delay > 0) {
+ GridTimeoutObject obj = lastTimeoutObj.get();
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ assert exchFut != null : "Delaying rebalance process without topology event.";
+
+ obj = new GridTimeoutObjectAdapter(delay) {
+ @Override public void onTimeout() {
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ };
+
+ lastTimeoutObj.set(obj);
+
+ cctx.time().addTimeoutObject(obj);
+ }
+
+ return null;
+ }
+
+ /**
+ * @param fut Future.
+ */
+ private boolean requestPartitions(RebalanceFuture fut,
+ GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+ if (topologyChanged(fut))
+ return false;
+
+ final ClusterNode node = e.getKey();
+
+ GridDhtPartitionDemandMessage d = e.getValue();
+
+ fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+ }
+
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+ final ClusterNode node = e.getKey();
+
+ final CacheConfiguration cfg = cctx.config();
+
+ final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
+
+ GridDhtPartitionDemandMessage d = e.getValue();
+
+ //Check remote node rebalancing API version.
+ if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+ ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+ int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
+
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
+
+ Iterator<Integer> it = parts.iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
+ if (!sParts.get(cnt).isEmpty()) {
+
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+ initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+ initD.updateSequence(fut.updateSeq);
+ initD.timeout(cctx.config().getRebalanceTimeout());
+
+ cctx.io().sendOrderedMessage(node,
+ GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+ cnt + ", partitions count=" + sParts.get(cnt).size() +
+ " (" + partitionsList(sParts.get(cnt)) + ")]");
+ }
+ }
+ }
+ else {
+ U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+ ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+ d.timeout(cctx.config().getRebalanceTimeout());
+ d.workerId(0);//old api support.
+
+ DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+ dw.run(node, d);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param c Partitions.
+ * @return String representation of partitions list.
+ */
+ private String partitionsList(Collection<Integer> c) {
+ LinkedList<Integer> s = new LinkedList<>(c);
+
+ Collections.sort(s);
+
+ StringBuilder sb = new StringBuilder();
+
+ int start = -1;
+
+ int prev = -1;
+
+ Iterator<Integer> sit = s.iterator();
+
+ while (sit.hasNext()) {
+ int p = sit.next();
+ if (start == -1) {
+ start = p;
+ prev = p;
+ }
+
+ if (prev < p - 1) {
+ sb.append(start);
+
+ if (start != prev)
+ sb.append("-").append(prev);
+
+ sb.append(", ");
+
+ start = p;
+ }
+
+ if (!sit.hasNext()) {
+ sb.append(start);
+
+ if (start != p)
+ sb.append("-").append(p);
+ }
+
+ prev = p;
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @param idx Index.
+ * @param id Node id.
+ * @param supply Supply.
+ */
+ public void handleSupplyMessage(
+ int idx,
+ final UUID id,
+ final GridDhtPartitionSupplyMessageV2 supply) {
+ AffinityTopologyVersion topVer = supply.topologyVersion();
+
+ final RebalanceFuture fut = rebalanceFut;
+
+ ClusterNode node = cctx.node(id);
+
+ if (node == null)
+ return;
+
+ if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
+ return; // Supple message based on another future.
+
+ if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on).
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + supply);
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ U.warn(log, "Class got undeployed during preloading: " + supply.classError());
+
+ fut.cancel(id);
+
+ return;
+ }
+
+ final GridDhtPartitionTopology top = cctx.dht().topology();
+
+ try {
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+ if (!preloadEntry(node, p, entry, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+ break;
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ top.own(part);
+
+ fut.partitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ fut.partitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ fut.partitionDone(id, p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : supply.missed())
+ if (cctx.affinity().localNode(miss, topVer))
+ fut.partitionMissed(id, miss);
+
+ for (Integer miss : supply.missed())
+ fut.partitionDone(id, miss);
+
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+ d.timeout(cctx.config().getRebalanceTimeout());
+
+ d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+
+ if (!topologyChanged(fut) && !fut.isDone()) {
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+ }
+ }
+
+ /**
+ * @param pick Node picked for preloading.
+ * @param p Partition.
+ * @param entry Preloaded entry.
+ * @param topVer Topology version.
+ * @return {@code False} if partition has become invalid during preloading.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private boolean preloadEntry(
+ ClusterNode pick,
+ int p,
+ GridCacheEntryInfo entry,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
+ try {
+ GridCacheEntryEx cached = null;
+
+ try {
+ cached = cctx.dht().entryEx(entry.key());
+
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+ if (cctx.dht().isIgfsDataCache() &&
+ cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+ LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+ "value, will ignore rebalance entries)");
+
+ if (cached.markObsoleteIfEmpty(null))
+ cached.context().cache().removeIfObsolete(cached.key());
+
+ return true;
+ }
+
+ if (preloadPred == null || preloadPred.apply(entry)) {
+ if (cached.initialValue(
+ entry.value(),
+ entry.version(),
+ entry.ttl(),
+ entry.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ )) {
+ cctx.evicts().touch(cached, topVer); // Start tracking.
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+ cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+ (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+ false, null, null, null);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+ ", part=" + p + ']');
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+ cached.key() + ", part=" + p + ']');
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+ return false;
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+ cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionDemander.class, this);
+ }
+
+ /**
+ * Sets last exchange future.
+ *
+ * @param lastFut Last future to set.
+ */
+ void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ lastExchangeFut = lastFut;
+ }
+
+ /**
+ *
+ */
+ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+ private final boolean sendStoppedEvnt;
+
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Remaining. T2: startTime, partitions */
+ private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
+
+ /** Missed. */
+ private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
+
+ /** Exchange future. */
+ @GridToStringExclude
+ private final GridDhtPartitionsExchangeFuture exchFut;
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Unique (per demander) sequence id. */
+ private final long updateSeq;
+
+ /**
+ * @param assigns Assigns.
+ * @param cctx Context.
+ * @param log Logger.
+ * @param sentStopEvnt Stop event flag.
+ */
+ RebalanceFuture(GridDhtPreloaderAssignments assigns,
+ GridCacheContext<?, ?> cctx,
+ IgniteLogger log,
+ boolean sentStopEvnt,
+ long updateSeq) {
+ assert assigns != null;
+
+ this.exchFut = assigns.exchangeFuture();
+ this.topVer = assigns.topologyVersion();
+ this.cctx = cctx;
+ this.log = log;
+ this.sendStoppedEvnt = sentStopEvnt;
+ this.updateSeq = updateSeq;
+ }
+
+ /**
+ * Dummy future. Will be done by real one.
+ */
+ public RebalanceFuture() {
+ this.exchFut = null;
+ this.topVer = null;
+ this.cctx = null;
+ this.log = null;
+ this.sendStoppedEvnt = false;
+ this.updateSeq = -1;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @param updateSeq Update sequence.
+ * @return true in case future created for specified updateSeq, false in other case.
+ */
+ private boolean isActual(long updateSeq) {
+ return this.updateSeq == updateSeq;
+ }
+
+ /**
+ * @return Is initial (created at demander creation).
+ */
+ private boolean isInitial() {
+ return topVer == null;
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param parts Parts.
+ */
+ private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+ synchronized (this) {
+ remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
+ }
+ }
+
+ /**
+ *
+ */
+ private void doneIfEmpty() {
+ synchronized (this) {
+ if (isDone())
+ return;
+
+ assert remaining.isEmpty();
+
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing is not required [cache=" + cctx.name() +
+ ", topology=" + topVer + "]");
+
+ checkIsDone();
+ }
+ }
+
+ /**
+ * Cancels this future.
+ *
+ * @return {@code true}.
+ */
+ @Override public boolean cancel() {
+ synchronized (this) {
+ if (isDone())
+ return true;
+
+ remaining.clear();
+
+ U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+ + ", topology=" + topologyVersion());
+
+ checkIsDone(true /* cancelled */);
+ }
+
+ return true;
+ }
+
+ /**
+ * @param nodeId Node id.
+ */
+ private void cancel(UUID nodeId) {
+ synchronized (this) {
+ if (isDone())
+ return;
+
+ U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+ remaining.remove(nodeId);
+
+ checkIsDone();
+ }
+
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ private void partitionMissed(UUID nodeId, int p) {
+ synchronized (this) {
+ if (isDone())
+ return;
+
+ if (missed.get(nodeId) == null)
+ missed.put(nodeId, new HashSet<Integer>());
+
+ missed.get(nodeId).add(p);
+ }
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ private void partitionDone(UUID nodeId, int p) {
+ synchronized (this) {
+ if (isDone())
+ return;
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
+
+ Collection<Integer> parts = remaining.get(nodeId).get2();
+
+ if (parts != null) {
+ parts.remove(p);
+
+ if (parts.isEmpty()) {
+ U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+ "rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+ remaining.remove(nodeId);
+ }
+ }
+
+ checkIsDone();
+ }
+ }
+
+ /**
+ * @param part Partition.
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+ assert discoEvt != null;
+
+ cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ }
+
+ /**
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+ preloadEvent(-1, type, discoEvt);
+ }
+
+ /**
+ *
+ */
+ private void checkIsDone() {
+ checkIsDone(false);
+ }
+
+ /**
+ * @param cancelled Is cancelled.
+ */
+ private void checkIsDone(boolean cancelled) {
+ if (remaining.isEmpty()) {
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+ if (log.isDebugEnabled())
+ log.debug("Completed rebalance future.");
+
+ cctx.shared().exchange().scheduleResendPartitions();
+
+ Collection<Integer> m = new HashSet<>();
+
+ for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+ if (e.getValue() != null && !e.getValue().isEmpty())
+ m.addAll(e.getValue());
+ }
+
+ if (!m.isEmpty()) {
+ U.log(log, ("Reassigning partitions that were missed: " + m));
+
+ onDone(false); //Finished but has missed partitions, will force dummy exchange
+
+ cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
+ return;
+ }
+
+ if (!cancelled && !cctx.preloader().syncFuture().isDone())
+ ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
+ onDone(true);
+ }
+ }
+ }
+
+ /**
+ * Supply message wrapper.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private static class SupplyMessage {
+ /** Sender ID. */
+ private UUID sndId;
+
+ /** Supply message. */
+ private GridDhtPartitionSupplyMessage supply;
+
+ /**
+ * Dummy constructor.
+ */
+ private SupplyMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param sndId Sender ID.
+ * @param supply Supply message.
+ */
+ SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+ this.sndId = sndId;
+ this.supply = supply;
+ }
+
+ /**
+ * @return Sender ID.
+ */
+ UUID senderId() {
+ return sndId;
+ }
+
+ /**
+ * @return Message.
+ */
+ GridDhtPartitionSupplyMessage supply() {
+ return supply;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SupplyMessage.class, this);
+ }
+ }
+
+ /** DemandWorker index. */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private final AtomicInteger dmIdx = new AtomicInteger();
+
+ /**
+ *
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private class DemandWorker {
+ /** Worker ID. */
+ private int id;
+
+ /** Partition-to-node assignments. */
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+ /** Message queue. */
+ private final LinkedBlockingDeque<SupplyMessage> msgQ =
+ new LinkedBlockingDeque<>();
+
+ /** Counter. */
+ private long cntr;
+
+ /** Hide worker logger and use cache logger instead. */
+ private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+ private volatile RebalanceFuture fut;
+
+ /**
+ * @param id Worker ID.
+ */
+ private DemandWorker(int id, RebalanceFuture fut) {
+ assert id >= 0;
+
+ this.id = id;
+ this.fut = fut;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void addMessage(SupplyMessage msg) {
+ msgQ.offer(msg);
+ }
+
+ /**
+ * @param deque Deque to poll from.
+ * @param time Time to wait.
+ * @return Polled item.
+ * @throws InterruptedException If interrupted.
+ */
+ @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+ return deque.poll(time, MILLISECONDS);
+ }
+
+ /**
+ * @param idx Unique index for this topic.
+ * @return Topic for partition.
+ */
+ public Object topic(long idx) {
+ return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+ }
+
+ /**
+ * @param node Node to demand from.
+ * @param topVer Topology version.
+ * @param d Demand message.
+ * @param exchFut Exchange future.
+ * @throws InterruptedException If interrupted.
+ * @throws ClusterTopologyCheckedException If node left.
+ * @throws IgniteCheckedException If failed to send message.
+ */
+ private void demandFromNode(
+ ClusterNode node,
+ final AffinityTopologyVersion topVer,
+ GridDhtPartitionDemandMessage d,
+ GridDhtPartitionsExchangeFuture exchFut
+ ) throws InterruptedException, IgniteCheckedException {
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ cntr++;
+
+ d.topic(topic(cntr));
+ d.workerId(id);
+
+ if (topologyChanged(fut))
+ return;
+
+ cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ try {
+ boolean retry;
+
+ // DoWhile.
+ // =======
+ do {
+ retry = false;
+
+ // Create copy.
+ d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+ long timeout = cctx.config().getRebalanceTimeout();
+
+ d.timeout(timeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+ // Send demand message.
+ cctx.io().send(node, d, cctx.ioPolicy());
+
+ // While.
+ // =====
+ while (!topologyChanged(fut)) {
+ SupplyMessage s = poll(msgQ, timeout);
+
+ // If timed out.
+ if (s == null) {
+ if (msgQ.isEmpty()) { // Safety check.
+ U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+ " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+ " configuration properties).");
+
+ // Ordered listener was removed if timeout expired.
+ cctx.io().removeOrderedHandler(d.topic());
+
+ // Must create copy to be able to work with IO manager thread local caches.
+ d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+ // Create new topic.
+ d.topic(topic(++cntr));
+
+ // Create new ordered listener.
+ cctx.io().addOrderedHandler(d.topic(),
+ new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId,
+ GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ // Resend message with larger timeout.
+ retry = true;
+
+ break; // While.
+ }
+ else
+ continue; // While.
+ }
+
+ // Check that message was received from expected node.
+ if (!s.senderId().equals(node.id())) {
+ U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+ ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+ continue; // While.
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + s);
+
+ GridDhtPartitionSupplyMessage supply = s.supply();
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
+
+ retry = true;
+
+ // Quit preloading.
+ break;
+ }
+
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ Collection<Integer> invalidParts = new GridLeanSet<>();
+
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!invalidParts.contains(p)) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+
+ if (!preloadEntry(node, p, entry, topVer)) {
+ invalidParts.add(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+ }
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ fut.partitionDone(node.id(), p);
+
+ top.own(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ fut.partitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ fut.partitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : s.supply().missed()) {
+ if (cctx.affinity().localNode(miss, topVer))
+ fut.partitionMissed(node.id(), miss);
+ }
+
+ if (fut.remaining.get(node.id()) == null)
+ break; // While.
+
+ if (s.supply().ack()) {
+ retry = true;
+
+ break;
+ }
+ }
+ }
+ while (retry && !topologyChanged(fut));
+ }
+ finally {
+ cctx.io().removeOrderedHandler(d.topic());
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param d D.
+ */
+ public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
+ demandLock.readLock().lock();
+
+ try {
+ GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
+
+ AffinityTopologyVersion topVer = fut.topVer;
+
+ try {
+ demandFromNode(node, topVer, d, exchFut);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+ }
+ }
+}