You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 18:21:35 UTC
[4/4] ignite git commit: Ignite-1093 Improved rebalancing
Ignite-1093
Improved rebalancing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d5718f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d5718f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d5718f2
Branch: refs/heads/ignite-1093-2
Commit: 9d5718f2e20d07f754ce7b5802a08a5b4d491d10
Parents: 37a0505
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Aug 28 19:20:08 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Aug 28 19:20:08 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 40 +-
.../configuration/IgniteConfiguration.java | 30 +
.../apache/ignite/internal/IgniteKernal.java | 1 +
.../ignite/internal/IgniteNodeAttributes.java | 3 +
.../communication/GridIoMessageFactory.java | 7 +-
.../processors/cache/GridCacheIoManager.java | 8 +
.../GridCachePartitionExchangeManager.java | 59 +-
.../processors/cache/GridCachePreloader.java | 20 +-
.../cache/GridCachePreloaderAdapter.java | 12 +-
.../processors/cache/GridCacheProcessor.java | 4 +-
.../preloader/GridDhtPartitionDemandPool.java | 1161 ---------------
.../dht/preloader/GridDhtPartitionDemander.java | 1362 ++++++++++++++++++
.../dht/preloader/GridDhtPartitionSupplier.java | 783 ++++++++++
.../GridDhtPartitionSupplyMessageV2.java | 423 ++++++
.../preloader/GridDhtPartitionSupplyPool.java | 545 -------
.../dht/preloader/GridDhtPreloader.java | 195 ++-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
...CacheP2pUnmarshallingRebalanceErrorTest.java | 3 +-
.../GridCacheRebalancingAsyncSelfTest.java | 85 ++
.../GridCacheRebalancingSyncSelfTest.java | 270 ++++
.../GridCacheReplicatedPreloadSelfTest.java | 20 -
.../testsuites/IgniteCacheTestSuite3.java | 4 +
.../config/benchmark-rebalancing-win.properties | 60 +
.../config/benchmark-rebalancing.properties | 79 +
modules/yardstick/config/ignite-log4j.xml | 143 ++
.../ignite-rebalancing-multicast-config.xml | 174 +++
.../ignite-rebalancing-multicast-win-config.xml | 70 +
modules/yardstick/pom.xml | 8 +-
.../yardstick/IgniteBenchmarkArguments.java | 13 +-
.../cache/IgniteCacheAbstractBenchmark.java | 5 +-
.../IgniteRebalancePutGetBenchmark.java | 72 +
31 files changed, 3876 insertions(+), 1785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index af2bbe8..8057329 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -57,6 +57,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default rebalance timeout (ms).*/
public static final long DFLT_REBALANCE_TIMEOUT = 10000;
+ /** Default rebalance batches count. */
+ public static final long DFLT_REBALANCE_BATCHES_COUNT = 3;
+
/** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
public static final long DFLT_REBALANCE_THROTTLE = 0;
@@ -247,6 +250,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Off-heap memory size. */
private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
+ /** Rebalance batches count. */
+ private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
/** */
private boolean swapEnabled = DFLT_SWAP_ENABLED;
@@ -1293,11 +1299,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ @Deprecated
/**
- * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
- * may create more threads for rebalancing than specified here (but never less threads).
- * <p>
- * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @return Size of rebalancing thread pool.
*/
@@ -1305,9 +1309,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return rebalancePoolSize;
}
+ @Deprecated
/**
- * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
- * for rebalancing than specified here (but never less threads).
+ * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
*
* @param rebalancePoolSize Size of rebalancing thread pool.
* @return {@code this} for chaining.
@@ -1791,6 +1795,30 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Gets number of batches generated by supply node at rebalancing start.
+ *
+ * @return batches count
+ */
+ public long getRebalanceBatchesCount() {
+ return rebalanceBatchesCount;
+ }
+
+ /**
+ * Sets number of batches generated by supply node at rebalancing start.
+ *
+ * @param rebalanceBatchesCnt batches count.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+ this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+ return this;
+ }
+
+ /**
* Gets cache store session listener factories.
*
* @return Cache store session listener factories.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 1fa1de4..06fb152 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -141,6 +141,9 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
+ /** Default limit of threads used at rebalance. 2 demand + 2 supply threads. */
+ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 4;
+
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -346,6 +349,9 @@ public class IgniteConfiguration {
/** Client mode flag. */
private Boolean clientMode;
+ /** Rebalance thread pool size. */
+ private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
/** Transactions configuration. */
private TransactionConfiguration txCfg = new TransactionConfiguration();
@@ -1326,6 +1332,30 @@ public class IgniteConfiguration {
return this;
}
+
+ /**
+ * Gets count of available rebalancing threads.
+ * Half will be used for supplying and half for demanding of partitions.
+ * Minimum is 2.
+ * @return count.
+ */
+ public int getRebalanceThreadPoolSize(){
+ return rebalanceThreadPoolSize;
+ }
+
+ /**
+ * Sets count of available rebalancing threads.
+ * Half will be used for supplying and half for demanding of partitions.
+ * Minimum is 2.
+ * @param size Size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+ this.rebalanceThreadPoolSize = size;
+
+ return this;
+ }
+
/**
* Returns a collection of life-cycle beans. These beans will be automatically
* notified of grid life-cycle events. Use life-cycle beans whenever you
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1db73bf..03110c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1170,6 +1170,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
add(ATTR_USER_NAME, System.getProperty("user.name"));
add(ATTR_GRID_NAME, gridName);
+ add(REBALANCING_VERSION, 1);
add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled());
add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode());
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 10b8df0..c04c69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -135,6 +135,9 @@ public final class IgniteNodeAttributes {
/** Node consistent id. */
public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id";
+ /** Rebalancing version id. */
+ public static final String REBALANCING_VERSION = ATTR_PREFIX + ".rebalancing.version";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2acfd2b..9d25de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -606,7 +606,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..112] - this
+ case 114:
+ msg = new GridDhtPartitionSupplyMessageV2();
+
+ break;
+
+ // [-3..114] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0ef190e..9e8f8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -502,6 +502,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 114: {
+ GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+ U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]");
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e00d3b7..bf77d1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,7 +48,8 @@ import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
@@ -274,6 +275,41 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
+ if (!cctx.kernalContext().clientNode()) {
+
+ for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+ @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+ if (!enterBusy())
+ return;
+
+ try {
+
+ cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
onDiscoveryEvent(cctx.localNodeId(), fut);
@@ -336,6 +372,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object demanderTopic(int idx) {
+ return TOPIC_CACHE.topic("Demander", idx);
+ }
+
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object supplierTopic(int idx) {
+ return TOPIC_CACHE.topic("Supplier", idx);
+ }
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -360,6 +412,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(err);
+ for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+ cctx.io().removeOrderedHandler(demanderTopic(cnt));
+ cctx.io().removeOrderedHandler(supplierTopic(cnt));
+ }
+
U.cancel(exchWorker);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b8bb08e..105bec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -91,7 +91,7 @@ public interface GridCachePreloader {
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
*/
- public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+ public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException;
/**
* @param p Preload predicate.
@@ -132,4 +132,22 @@ public interface GridCachePreloader {
* Unwinds undeploys.
*/
public void unwindUndeploys();
+
+
+ /**
+ * Handles Supply message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param s Supply message.
+ */
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+ /**
+ * Handles Demand message.
+ *
+ * @param id Node Id.
+ * @param d Demand message.
+ */
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 0adf510..527e5bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -117,6 +117,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
@@ -142,7 +152,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dd4d30b..f283008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -324,10 +324,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
U.maskName(cc.getName()) + ']');
- if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
- assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0");
+ if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
- }
if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index a6e6c4d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-import static org.apache.ignite.internal.processors.dr.GridDrType.*;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
- /** Dummy message to wake up a blocking queue if a node leaves. */
- private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- @GridToStringInclude
- private final Collection<DemandWorker> dmdWorkers;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
- @GridToStringInclude
- private SyncFuture syncFut;
-
- /** Preload timeout. */
- private final AtomicLong timeout;
-
- /** Allows demand threads to synchronize their step. */
- private CyclicBarrier barrier;
-
- /** Demand lock. */
- private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
- /** */
- private int poolSize;
-
- /** Last timeout object. */
- private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
- /** Last exchange future. */
- private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
- poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- if (enabled) {
- barrier = new CyclicBarrier(poolSize);
-
- dmdWorkers = new ArrayList<>(poolSize);
-
- for (int i = 0; i < poolSize; i++)
- dmdWorkers.add(new DemandWorker(i));
-
- syncFut = new SyncFuture(dmdWorkers);
- }
- else {
- dmdWorkers = Collections.emptyList();
-
- syncFut = new SyncFuture(dmdWorkers);
-
- // Calling onDone() immediately since preloading is disabled.
- syncFut.onDone();
- }
-
- timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
- }
-
- /**
- *
- */
- void start() {
- if (poolSize > 0) {
- for (DemandWorker w : dmdWorkers)
- new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
- }
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(dmdWorkers);
-
- if (log.isDebugEnabled())
- log.debug("Before joining on demand workers: " + dmdWorkers);
-
- U.join(dmdWorkers, log);
-
- if (log.isDebugEnabled())
- log.debug("After joining on demand workers: " + dmdWorkers);
-
- lastExchangeFut = null;
-
- lastTimeoutObj.set(null);
- }
-
- /**
- * @return Future for {@link CacheRebalanceMode#SYNC} mode.
- */
- IgniteInternalFuture<?> syncFuture() {
- return syncFut;
- }
-
- /**
- * Sets preload predicate for demand pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return poolSize;
- }
-
- /**
- * Wakes up demand workers when new exchange future was added.
- */
- void onExchangeFutureAdded() {
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers)
- w.addMessage(DUMMY_TOP);
- }
- }
-
- /**
- * Force preload.
- */
- void forcePreload() {
- GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- if (exchFut != null) {
- if (log.isDebugEnabled())
- log.debug("Forcing rebalance event for future: " + exchFut);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- else if (log.isDebugEnabled())
- log.debug("Ignoring force rebalance request (no topology event happened yet).");
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int type, DiscoveryEvent discoEvt) {
- preloadEvent(-1, type, discoEvt);
- }
-
- /**
- * @param part Partition.
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
- assert discoEvt != null;
-
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
- }
-
- /**
- * @param msg Message to check.
- * @return {@code True} if dummy message.
- */
- private boolean dummyTopology(SupplyMessage msg) {
- return msg == DUMMY_TOP;
- }
-
- /**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Picked owners.
- */
- private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
- int affCnt = affNodes.size();
-
- Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
- int rmtCnt = rmts.size();
-
- if (rmtCnt <= affCnt)
- return rmts;
-
- List<ClusterNode> sorted = new ArrayList<>(rmts);
-
- // Sort in descending order, so nodes with higher order will be first.
- Collections.sort(sorted, CU.nodeComparator(false));
-
- // Pick newest nodes.
- return sorted.subList(0, affCnt);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Nodes owning this partition.
- */
- private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
- }
-
- /**
- * @param assigns Assignments.
- * @param force {@code True} if dummy reassign.
- */
- void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
- if (log.isDebugEnabled())
- log.debug("Adding partition assignments: " + assigns);
-
- long delay = cctx.config().getRebalanceDelay();
-
- if (delay == 0 || force) {
- assert assigns != null;
-
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers) {
- w.addAssignments(assigns);
-
- w.addMessage(DUMMY_TOP);
- }
- }
- }
- else if (delay > 0) {
- assert !force;
-
- GridTimeoutObject obj = lastTimeoutObj.get();
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- assert exchFut != null : "Delaying rebalance process without topology event.";
-
- obj = new GridTimeoutObjectAdapter(delay) {
- @Override public void onTimeout() {
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- };
-
- lastTimeoutObj.set(obj);
-
- cctx.time().addTimeoutObject(obj);
- }
- }
-
- /**
- *
- */
- void unwindUndeploys() {
- demandLock.writeLock().lock();
-
- try {
- cctx.deploy().unwind(cctx);
- }
- finally {
- demandLock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionDemandPool.class, this);
- }
-
- /**
- *
- */
- private class DemandWorker extends GridWorker {
- /** Worker ID. */
- private int id;
-
- /** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
- /** Message queue. */
- private final LinkedBlockingDeque<SupplyMessage> msgQ =
- new LinkedBlockingDeque<>();
-
- /** Counter. */
- private long cntr;
-
- /** Hide worker logger and use cache logger instead. */
- private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
- /**
- * @param id Worker ID.
- */
- private DemandWorker(int id) {
- super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
- assert id >= 0;
-
- this.id = id;
- }
-
- /**
- * @param assigns Assignments.
- */
- void addAssignments(GridDhtPreloaderAssignments assigns) {
- assert assigns != null;
-
- assignQ.offer(assigns);
-
- if (log.isDebugEnabled())
- log.debug("Added assignments to worker: " + this);
- }
-
- /**
- * @return {@code True} if topology changed.
- */
- private boolean topologyChanged() {
- return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
- }
-
- /**
- * @param msg Message.
- */
- private void addMessage(SupplyMessage msg) {
- if (!enterBusy())
- return;
-
- try {
- assert dummyTopology(msg) || msg.supply().workerId() == id;
-
- msgQ.offer(msg);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * @param timeout Timed out value.
- */
- private void growTimeout(long timeout) {
- long newTimeout = (long)(timeout * 1.5D);
-
- // Account for overflow.
- if (newTimeout < 0)
- newTimeout = Long.MAX_VALUE;
-
- // Grow by 50% only if another thread didn't do it already.
- if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
- U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
- newTimeout + "ms.");
- }
-
- /**
- * @param pick Node picked for preloading.
- * @param p Partition.
- * @param entry Preloaded entry.
- * @param topVer Topology version.
- * @return {@code False} if partition has become invalid during preloading.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private boolean preloadEntry(
- ClusterNode pick,
- int p,
- GridCacheEntryInfo entry,
- AffinityTopologyVersion topVer
- ) throws IgniteCheckedException {
- try {
- GridCacheEntryEx cached = null;
-
- try {
- cached = cctx.dht().entryEx(entry.key());
-
- if (log.isDebugEnabled())
- log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
- if (cctx.dht().isIgfsDataCache() &&
- cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
- LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
- "value, will ignore rebalance entries): " + name());
-
- if (cached.markObsoleteIfEmpty(null))
- cached.context().cache().removeIfObsolete(cached.key());
-
- return true;
- }
-
- if (preloadPred == null || preloadPred.apply(entry)) {
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
- )) {
- cctx.evicts().touch(cached, topVer); // Start tracking.
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
- cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
- (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
- false, null, null, null);
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
- cached.key() + ", part=" + p + ']');
- }
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
- return false;
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
- }
-
- return true;
- }
-
- /**
- * @param idx Unique index for this topic.
- * @return Topic for partition.
- */
- public Object topic(long idx) {
- return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
- }
-
- /**
- * @param node Node to demand from.
- * @param topVer Topology version.
- * @param d Demand message.
- * @param exchFut Exchange future.
- * @return Missed partitions.
- * @throws InterruptedException If interrupted.
- * @throws ClusterTopologyCheckedException If node left.
- * @throws IgniteCheckedException If failed to send message.
- */
- private Set<Integer> demandFromNode(
- ClusterNode node,
- final AffinityTopologyVersion topVer,
- GridDhtPartitionDemandMessage d,
- GridDhtPartitionsExchangeFuture exchFut
- ) throws InterruptedException, IgniteCheckedException {
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- cntr++;
-
- d.topic(topic(cntr));
- d.workerId(id);
-
- Set<Integer> missed = new HashSet<>();
-
- // Get the same collection that will be sent in the message.
- Collection<Integer> remaining = d.partitions();
-
- // Drain queue before processing a new node.
- drainQueue();
-
- if (isCancelled() || topologyChanged())
- return missed;
-
- cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- try {
- boolean retry;
-
- // DoWhile.
- // =======
- do {
- retry = false;
-
- // Create copy.
- d = new GridDhtPartitionDemandMessage(d, remaining);
-
- long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
- d.timeout(timeout);
-
- if (log.isDebugEnabled())
- log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
- // Send demand message.
- cctx.io().send(node, d, cctx.ioPolicy());
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged()) {
- SupplyMessage s = poll(msgQ, timeout, this);
-
- // If timed out.
- if (s == null) {
- if (msgQ.isEmpty()) { // Safety check.
- U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
- " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
- " configuration properties).");
-
- growTimeout(timeout);
-
- // Ordered listener was removed if timeout expired.
- cctx.io().removeOrderedHandler(d.topic());
-
- // Must create copy to be able to work with IO manager thread local caches.
- d = new GridDhtPartitionDemandMessage(d, remaining);
-
- // Create new topic.
- d.topic(topic(++cntr));
-
- // Create new ordered listener.
- cctx.io().addOrderedHandler(d.topic(),
- new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId,
- GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- // Resend message with larger timeout.
- retry = true;
-
- break; // While.
- }
- else
- continue; // While.
- }
-
- // If topology changed.
- if (dummyTopology(s)) {
- if (topologyChanged())
- break; // While.
- else
- continue; // While.
- }
-
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
- continue; // While.
- }
-
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
-
- GridDhtPartitionSupplyMessage supply = s.supply();
-
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
-
- retry = true;
-
- // Quit preloading.
- break;
- }
-
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- int p = e.getKey();
-
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
-
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
-
- assert reserved : "Failed to reserve partition [gridName=" +
- cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
- part.lock();
-
- try {
- Collection<Integer> invalidParts = new GridLeanSet<>();
-
- // Loop through all received entries and try to preload them.
- for (GridCacheEntryInfo entry : e.getValue().infos()) {
- if (!invalidParts.contains(p)) {
- if (!part.preloadingPermitted(entry.key(), entry.version())) {
- if (log.isDebugEnabled())
- log.debug("Preloading is not permitted for entry due to " +
- "evictions [key=" + entry.key() +
- ", ver=" + entry.version() + ']');
-
- continue;
- }
-
- if (!preloadEntry(node, p, entry, topVer)) {
- invalidParts.add(p);
-
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- }
- }
- }
-
- boolean last = supply.last().contains(p);
-
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- remaining.remove(p);
-
- top.own(part);
-
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " + part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
- }
- }
- finally {
- part.unlock();
- part.release();
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
- }
- }
-
- remaining.removeAll(s.supply().missed());
-
- // Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed())
- if (cctx.affinity().localNode(miss, topVer))
- missed.add(miss);
-
- if (remaining.isEmpty())
- break; // While.
-
- if (s.supply().ack()) {
- retry = true;
-
- break;
- }
- }
- }
- while (retry && !isCancelled() && !topologyChanged());
-
- return missed;
- }
- finally {
- cctx.io().removeOrderedHandler(d.topic());
- }
- }
-
- /**
- * @throws InterruptedException If interrupted.
- */
- private void drainQueue() throws InterruptedException {
- while (!msgQ.isEmpty()) {
- SupplyMessage msg = msgQ.take();
-
- if (log.isDebugEnabled())
- log.debug("Drained supply message: " + msg);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- try {
- int rebalanceOrder = cctx.config().getRebalanceOrder();
-
- if (!CU.isMarshallerCache(cctx.name())) {
- if (log.isDebugEnabled())
- log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
- try {
- cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
- }
- }
-
- if (rebalanceOrder > 0) {
- IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
- try {
- if (fut != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
- ", rebalanceOrder=" + rebalanceOrder + ']');
-
- fut.get();
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
- }
- }
-
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- boolean stopEvtFired = false;
-
- while (!isCancelled()) {
- try {
- barrier.await();
-
- if (id == 0 && exchFut != null && !exchFut.dummy() &&
- cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
- if (!cctx.isReplicated() || !stopEvtFired) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stopEvtFired = true;
- }
- }
- }
- catch (BrokenBarrierException ignore) {
- throw new InterruptedException("Demand worker stopped.");
- }
-
- // Sync up all demand threads at this step.
- GridDhtPreloaderAssignments assigns = null;
-
- while (assigns == null)
- assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
- demandLock.readLock().lock();
-
- try {
- exchFut = assigns.exchangeFuture();
-
- // Assignments are empty if preloading is disabled.
- if (assigns.isEmpty())
- continue;
-
- boolean resync = false;
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged() && !resync) {
- Collection<Integer> missed = new HashSet<>();
-
- // For.
- // ===
- for (ClusterNode node : assigns.keySet()) {
- if (topologyChanged() || isCancelled())
- break; // For.
-
- GridDhtPartitionDemandMessage d = assigns.remove(node);
-
- // If another thread is already processing this message,
- // move to the next node.
- if (d == null)
- continue; // For.
-
- try {
- Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
- if (!set.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
- set + ']');
-
- missed.addAll(set);
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
- ", msg=" + e.getMessage() + ']');
-
- resync = true;
-
- break; // For.
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
- }
- }
-
- // Processed missed entries.
- if (!missed.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + missed);
-
- assert exchFut.exchangeId() != null;
-
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
- }
- else
- break; // While.
- }
- }
- finally {
- demandLock.readLock().unlock();
-
- syncFut.onWorkerDone(this);
- }
-
- cctx.shared().exchange().scheduleResendPartitions();
- }
- }
- finally {
- // Safety.
- syncFut.onWorkerDone(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
- }
- }
-
- /**
- * Sets last exchange future.
- *
- * @param lastFut Last future to set.
- */
- void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- lastExchangeFut = lastFut;
- }
-
- /**
- * @param exchFut Exchange future.
- * @return Assignments of partitions to nodes.
- */
- GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- // No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- int partCnt = cctx.affinity().partitions();
-
- assert exchFut.forcePreload() || exchFut.dummyReassign() ||
- exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", topVer=" + top.topologyVersion() + ']';
-
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- AffinityTopologyVersion topVer = assigns.topologyVersion();
-
- for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
- if (log.isDebugEnabled())
- log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
- exchFut.exchangeId());
-
- break;
- }
-
- // If partition belongs to local node.
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
- assert part.id() == p;
-
- if (part.state() != MOVING) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
- continue; // For.
- }
-
- Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
- if (picked.isEmpty()) {
- top.own(part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
-
- if (log.isDebugEnabled())
- log.debug("Owning partition as there are no other owners: " + part);
- }
- else {
- ClusterNode n = F.first(picked);
-
- GridDhtPartitionDemandMessage msg = assigns.get(n);
-
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
- cctx.cacheId()));
- }
-
- msg.addPartition(p);
- }
- }
- }
-
- return assigns;
- }
-
- /**
- *
- */
- private class SyncFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Remaining workers. */
- private Collection<DemandWorker> remaining;
-
- /**
- * @param workers List of workers.
- */
- private SyncFuture(Collection<DemandWorker> workers) {
- assert workers.size() == poolSize();
-
- remaining = Collections.synchronizedList(new LinkedList<>(workers));
- }
-
- /**
- * @param w Worker who iterated through all partitions.
- */
- void onWorkerDone(DemandWorker w) {
- if (isDone())
- return;
-
- if (remaining.remove(w))
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
- if (remaining.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completed sync future.");
-
- onDone();
- }
- }
- }
-
- /**
- * Supply message wrapper.
- */
- private static class SupplyMessage {
- /** Sender ID. */
- private UUID sndId;
-
- /** Supply message. */
- private GridDhtPartitionSupplyMessage supply;
-
- /**
- * Dummy constructor.
- */
- private SupplyMessage() {
- // No-op.
- }
-
- /**
- * @param sndId Sender ID.
- * @param supply Supply message.
- */
- SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
- this.sndId = sndId;
- this.supply = supply;
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return sndId;
- }
-
- /**
- * @return Message.
- */
- GridDhtPartitionSupplyMessage supply() {
- return supply;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SupplyMessage.class, this);
- }
- }
-}