You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/08 18:19:22 UTC
[2/2] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce23c05c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce23c05c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce23c05c
Branch: refs/heads/ignite-1093-2
Commit: ce23c05c7dee8a24ce44771dc52497e9196b3fec
Parents: 53ba0df
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 8 19:19:10 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 8 19:19:10 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 8 ++--
.../GridCachePartitionExchangeManager.java | 47 +++++++-------------
.../dht/preloader/GridDhtPartitionDemander.java | 17 ++++---
3 files changed, 28 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index d5fbafc..d983550 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,8 +149,8 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
- /** Default limit of threads used at rebalance. 1 demand + 1 supply thread. */
- public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+ /** Default limit of threads used at rebalance. */
+ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -1344,7 +1344,7 @@ public class IgniteConfiguration {
/**
* Gets count of available rebalancing threads.
* Half will be used for supplying and half for demanding of partitions.
- * Minimum is 2.
+ * Minimum is 1.
* @return count.
*/
public int getRebalanceThreadPoolSize(){
@@ -1354,7 +1354,7 @@ public class IgniteConfiguration {
/**
* Sets count of available rebalancing threads.
* Half will be used for supplying and half for demanding of partitions.
- * Minimum is 2.
+ * Minimum is 1.
* @param size Size.
* @return {@code this} for chaining.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f3e3fe0..bbab008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -88,8 +88,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -317,29 +317,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!cctx.kernalContext().clientNode()) {
- for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+ for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) {
final int idx = cnt;
- cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
- @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+ cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
+ @Override public void apply(final UUID id, final GridCacheMessage m) {
if (!enterBusy())
return;
try {
- cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
- }
- finally {
- leaveBusy();
- }
- }
- });
- cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- if (!enterBusy())
- return;
-
- try {
- cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+ if (m instanceof GridDhtPartitionSupplyMessageV2)
+ cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+ idx, id, (GridDhtPartitionSupplyMessageV2)m);
+ else if (m instanceof GridDhtPartitionDemandMessage)
+ cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+ id, (GridDhtPartitionDemandMessage)m);
+ else
+ log.error("Unsupported message type " + m.getClass().getName());
}
finally {
leaveBusy();
@@ -415,16 +409,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param idx
* @return topic
*/
- public static Object demanderTopic(int idx) {
- return TOPIC_CACHE.topic("Demander", idx);
- }
-
- /**
- * @param idx
- * @return topic
- */
- public static Object supplierTopic(int idx) {
- return TOPIC_CACHE.topic("Supplier", idx);
+ public static Object rebalanceTopic(int idx) {
+ return TOPIC_CACHE.topic("Rebalance", idx);
}
/** {@inheritDoc} */
@@ -451,9 +437,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(err);
- for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
- cctx.io().removeOrderedHandler(demanderTopic(cnt));
- cctx.io().removeOrderedHandler(supplierTopic(cnt));
+ for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) {
+ cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
}
U.cancel(exchWorker);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 32595f2..b55e626 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -238,7 +238,7 @@ public class GridDhtPartitionDemander {
fut.init(assigns);
if (assigns.isEmpty()) {
- fut.onDone();
+ fut.checkIsDone();
return;
}
@@ -401,7 +401,7 @@ public class GridDhtPartitionDemander {
fut.append(node.id(), remainings);
- int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+ int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize());
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
@@ -422,11 +422,10 @@ public class GridDhtPartitionDemander {
// Create copy.
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
- initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
-
+ initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
try {
if (!topologyChanged(topVer))
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
else
fut.onCancel();
}
@@ -633,11 +632,11 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage nextD =
new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
+ nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
if (!topologyChanged(topVer)) {
// Send demand message.
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
else
@@ -987,7 +986,7 @@ public class GridDhtPartitionDemander {
/**
*
*/
- private void checkIsDone() {
+ public void checkIsDone() {
if (remaining.isEmpty()) {
if (log.isDebugEnabled())
log.debug("Completed sync future.");
@@ -1008,7 +1007,7 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
+ if (!cctx.isReplicated() || cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
if (lsnr != null)