You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/08 18:19:21 UTC

[1/2] ignite git commit: 1093

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 3a6bc565c -> ce23c05c7


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53ba0df4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53ba0df4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53ba0df4

Branch: refs/heads/ignite-1093-2
Commit: 53ba0df420d51d24fdaddcb062d0edc9921cded6
Parents: 3a6bc56
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 8 10:35:57 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 8 10:35:57 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 34 ++++++++++----------
 1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53ba0df4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 7a0a94c..32595f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -249,7 +249,7 @@ public class GridDhtPartitionDemander {
                 return;
             }
 
-            final SyncFuture cSF = fut;
+            final SyncFuture curFut = fut;
 
             IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
                 @Override public void run() {
@@ -258,22 +258,22 @@ public class GridDhtPartitionDemander {
                             log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
 
                         try {
-                            IgniteInternalFuture fut;
+                            IgniteInternalFuture mFut;
                             do {
-                                fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
+                                mFut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
                             }
-                            while (!((SyncFuture)fut).isInited() || ((SyncFuture)fut).topologyVersion().topologyVersion() < cSF.topologyVersion().topologyVersion());
+                            while (!((SyncFuture)mFut).isInited() || ((SyncFuture)mFut).topologyVersion().topologyVersion() < curFut.topologyVersion().topologyVersion());
 
-                            if (((SyncFuture)fut).topologyVersion().topologyVersion() > cSF.topologyVersion().topologyVersion()) {
-                                cSF.onCancel();
+                            if (((SyncFuture)mFut).topologyVersion().topologyVersion() > curFut.topologyVersion().topologyVersion()) {
+                                curFut.onCancel();
 
                                 return;
                             }
 
                             if (!topologyChanged(topVer))
-                                fut.get();
+                                mFut.get();
                             else {
-                                cSF.onCancel();
+                                curFut.onCancel();
 
                                 return;
                             }
@@ -282,13 +282,13 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ']');
-                                cSF.onCancel();
+                                curFut.onCancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            cSF.onCancel();
+                            curFut.onCancel();
 
                             throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
                         }
@@ -297,18 +297,18 @@ public class GridDhtPartitionDemander {
                     int rebalanceOrder = cctx.config().getRebalanceOrder();
 
                     if (rebalanceOrder > 0) {
-                        IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+                        IgniteInternalFuture<?> oFut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
 
                         try {
-                            if (fut != null) {
+                            if (oFut != null) {
                                 if (log.isDebugEnabled())
                                     log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
                                         ", rebalanceOrder=" + rebalanceOrder + ']');
 
                                 if (!topologyChanged(topVer))
-                                    fut.get();
+                                    oFut.get();
                                 else {
-                                    cSF.onCancel();
+                                    curFut.onCancel();
 
                                     return;
                                 }
@@ -318,19 +318,19 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-                                cSF.onCancel();
+                                curFut.onCancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            cSF.onCancel();
+                            curFut.onCancel();
 
                             throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
                     }
 
-                    requestPartitions(cSF);
+                    requestPartitions(curFut);
                 }
             });
 


[2/2] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce23c05c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce23c05c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce23c05c

Branch: refs/heads/ignite-1093-2
Commit: ce23c05c7dee8a24ce44771dc52497e9196b3fec
Parents: 53ba0df
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 8 19:19:10 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 8 19:19:10 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  8 ++--
 .../GridCachePartitionExchangeManager.java      | 47 +++++++-------------
 .../dht/preloader/GridDhtPartitionDemander.java | 17 ++++---
 3 files changed, 28 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index d5fbafc..d983550 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,8 +149,8 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
-    /** Default limit of threads used at rebalance. 1 demand + 1 supply thread. */
-    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+    /** Default limit of threads used at rebalance. */
+    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;
 
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -1344,7 +1344,7 @@ public class IgniteConfiguration {
     /**
      * Gets count of available rebalancing threads.
      * Half will be used for supplying and half for demanding of partitions.
-     * Minimum is 2.
+     * Minimum is 1.
      * @return count.
      */
     public int getRebalanceThreadPoolSize(){
@@ -1354,7 +1354,7 @@ public class IgniteConfiguration {
     /**
      * Sets count of available rebalancing threads.
      * Half will be used for supplying and half for demanding of partitions.
-     * Minimum is 2.
+     * Minimum is 1.
      * @param size Size.
      * @return {@code this} for chaining.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f3e3fe0..bbab008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -88,8 +88,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
 
@@ -317,29 +317,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         if (!cctx.kernalContext().clientNode()) {
 
-            for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+            for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) {
                 final int idx = cnt;
 
-                cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
-                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+                cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
+                    @Override public void apply(final UUID id, final GridCacheMessage m) {
                         if (!enterBusy())
                             return;
 
                         try {
-                            cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
-                        }
-                        finally {
-                            leaveBusy();
-                        }
-                    }
-                });
-                cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                        if (!enterBusy())
-                            return;
-
-                        try {
-                            cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+                            if (m instanceof GridDhtPartitionSupplyMessageV2)
+                                cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+                                    idx, id, (GridDhtPartitionSupplyMessageV2)m);
+                            else if (m instanceof GridDhtPartitionDemandMessage)
+                                cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+                                    id, (GridDhtPartitionDemandMessage)m);
+                            else
+                                log.error("Unsupported message type " + m.getClass().getName());
                         }
                         finally {
                             leaveBusy();
@@ -415,16 +409,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param idx
      * @return topic
      */
-    public static Object demanderTopic(int idx) {
-        return TOPIC_CACHE.topic("Demander", idx);
-    }
-
-    /**
-     * @param idx
-     * @return topic
-     */
-    public static Object supplierTopic(int idx) {
-        return TOPIC_CACHE.topic("Supplier", idx);
+    public static Object rebalanceTopic(int idx) {
+        return TOPIC_CACHE.topic("Rebalance", idx);
     }
 
     /** {@inheritDoc} */
@@ -451,9 +437,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(err);
 
-        for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
-            cctx.io().removeOrderedHandler(demanderTopic(cnt));
-            cctx.io().removeOrderedHandler(supplierTopic(cnt));
+        for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) {
+            cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
         }
 
         U.cancel(exchWorker);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce23c05c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 32595f2..b55e626 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -238,7 +238,7 @@ public class GridDhtPartitionDemander {
                 fut.init(assigns);
 
             if (assigns.isEmpty()) {
-                fut.onDone();
+                fut.checkIsDone();
 
                 return;
             }
@@ -401,7 +401,7 @@ public class GridDhtPartitionDemander {
 
                 fut.append(node.id(), remainings);
 
-                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize());
 
                 List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
@@ -422,11 +422,10 @@ public class GridDhtPartitionDemander {
                         // Create copy.
                         GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
-
+                        initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
                         try {
                             if (!topologyChanged(topVer))
-                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
                             else
                                 fut.onCancel();
                         }
@@ -633,11 +632,11 @@ public class GridDhtPartitionDemander {
                     GridDhtPartitionDemandMessage nextD =
                         new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                    nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
+                    nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
                     if (!topologyChanged(topVer)) {
                         // Send demand message.
-                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
+                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
                             nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                     }
                     else
@@ -987,7 +986,7 @@ public class GridDhtPartitionDemander {
         /**
          *
          */
-        private void checkIsDone() {
+        public void checkIsDone() {
             if (remaining.isEmpty()) {
                 if (log.isDebugEnabled())
                     log.debug("Completed sync future.");
@@ -1008,7 +1007,7 @@ public class GridDhtPartitionDemander {
 
                 cctx.shared().exchange().scheduleResendPartitions();
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                if (!cctx.isReplicated() || cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
 
                 if (lsnr != null)