You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 18:59:58 UTC

[2/4] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ce1dd0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ce1dd0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ce1dd0a

Branch: refs/heads/ignite-1093-2
Commit: 5ce1dd0aca5d485f91dcd6f47798b46424436cce
Parents: e52367b
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 16:49:12 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 16:49:12 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 192 ++++++++++---------
 1 file changed, 102 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ce1dd0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 31e2e5e..9960435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -124,7 +123,7 @@ public class GridDhtPartitionDemander {
 
         boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        syncFut = new SyncFuture(null, cctx, log, true);
+        syncFut = new SyncFuture();//Dummy.
 
         if (!enabled)
             // Calling onDone() immediately since preloading is disabled.
@@ -132,13 +131,13 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     *
+     * Start.
      */
     void start() {
     }
 
     /**
-     *
+     * Stop.
      */
     void stop() {
         syncFut.cancel();
@@ -209,7 +208,7 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param name Name.
+     * @param name Cache name.
      * @param fut Future.
      */
     private void waitForCacheRebalancing(String name, SyncFuture fut) {
@@ -223,8 +222,6 @@ public class GridDhtPartitionDemander {
                 wFut.get();
             else {
                 fut.cancel();
-
-                return;
             }
         }
         catch (IgniteInterruptedCheckedException ignored) {
@@ -232,8 +229,6 @@ public class GridDhtPartitionDemander {
                 log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
                     "[cacheName=" + cctx.name() + ']');
                 fut.cancel();
-
-                return;
             }
         }
         catch (IgniteCheckedException e) {
@@ -246,7 +241,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException Exception
      */
 
     void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
@@ -258,46 +253,37 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            final AffinityTopologyVersion topVer = assigns.topologyVersion();
+            final SyncFuture oldFut = syncFut;
 
-            SyncFuture fut = syncFut;
-
-            if (fut.isInited()) {
-                if (!fut.isDone())
-                    fut.cancel();
+            final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
 
-                fut = new SyncFuture(assigns, cctx, log, false);
+            syncFut = fut;
 
-                syncFut = fut;
-            }
+            if (!oldFut.isDummy())
+                oldFut.cancel();
             else
-                fut.init(assigns);
-
-            if (assigns.isEmpty()) {
-                fut.checkIsDone();
-
-                if (fut.assigns.topologyVersion().topologyVersion() > 1)// Not a First node.
-                    U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
-                        ", topology=" + fut.assigns.topologyVersion() + "]");
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> future) {
+                        oldFut.onDone();
+                    }
+                });
 
+            if (fut.doneIfEmpty())// Done in case empty assigns.
                 return;
-            }
 
-            if (topologyChanged(topVer)) {
+            if (topologyChanged(fut.topologyVersion())) {
                 fut.cancel();
 
                 return;
             }
 
-            final SyncFuture curFut = fut;
-
             IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
                 @Override public void run() {
                     if (!CU.isMarshallerCache(cctx.name())) {
-                        waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut);
+                        waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut);
 
                         if (!CU.isUtilityCache(cctx.name())) {
-                            waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut);
+                            waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut);
                         }
                     }
 
@@ -312,10 +298,10 @@ public class GridDhtPartitionDemander {
                                     log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
                                         ", rebalanceOrder=" + rebalanceOrder + ']');
 
-                                if (!topologyChanged(topVer))
+                                if (!topologyChanged(fut.topologyVersion()))
                                     oFut.get();
                                 else {
-                                    curFut.cancel();
+                                    fut.cancel();
 
                                     return;
                                 }
@@ -325,19 +311,19 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-                                curFut.cancel();
+                                fut.cancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            curFut.cancel();
+                            fut.cancel();
 
                             throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
                     }
 
-                    requestPartitions(curFut);
+                    requestPartitions(fut);
                 }
             });
 
@@ -394,15 +380,12 @@ public class GridDhtPartitionDemander {
             final CacheConfiguration cfg = cctx.config();
 
             U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+                ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                ", topology=" + d.topologyVersion() + "]");
 
             //Check remote node rebalancing API version.
             if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
-                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
-                remainings.addAll(d.partitions());
-
-                fut.appendPartitions(node.id(), remainings);
+                fut.appendPartitions(node.id(), d.partitions());
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -627,24 +610,22 @@ public class GridDhtPartitionDemander {
             for (Integer miss : supply.missed())
                 fut.partitionDone(id, miss);
 
-            if (!fut.isDone()) {
-                GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+            GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
 
-                if (d != null) {
-                    // Create copy.
-                    GridDhtPartitionDemandMessage nextD =
-                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+            if (d != null) {
+                // Create copy.
+                GridDhtPartitionDemandMessage nextD =
+                    new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                    nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+                nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                    if (!topologyChanged(topVer)) {
-                        // Send demand message.
-                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                            nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
-                    }
-                    else
-                        fut.cancel();
+                if (!topologyChanged(topVer)) {
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                 }
+                else
+                    fut.cancel();
             }
         }
         catch (ClusterTopologyCheckedException e) {
@@ -759,12 +740,12 @@ public class GridDhtPartitionDemander {
     /**
      *
      */
-    public static class SyncFuture extends GridFutureAdapter<Boolean> {
+    public static class SyncFuture extends GridFutureAdapter<Object> {
         /** */
         private static final long serialVersionUID = 1L;
 
         /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
-        private final boolean sendStopEvnt;
+        private final boolean sendStoppedEvnt;
 
         /** */
         private final GridCacheContext<?, ?> cctx;
@@ -782,13 +763,13 @@ public class GridDhtPartitionDemander {
         private final Lock lock = new ReentrantLock();
 
         /** Assignments. */
-        private volatile GridDhtPreloaderAssignments assigns;
-
-        /** Completed. */
-        private volatile boolean completed = true;
+        private final GridDhtPreloaderAssignments assigns;
 
         /**
          * @param assigns Assigns.
+         * @param cctx Context.
+         * @param log Logger.
+         * @param sentStopEvnt Stop event flag.
          */
         SyncFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
@@ -797,35 +778,39 @@ public class GridDhtPartitionDemander {
             this.assigns = assigns;
             this.cctx = cctx;
             this.log = log;
-            this.sendStopEvnt = sentStopEvnt;
+            this.sendStoppedEvnt = sentStopEvnt;
+
+            if (assigns != null)
+                cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
+                    new CI1<IgniteInternalFuture<Long>>() {
+                        @Override public void apply(IgniteInternalFuture<Long> future) {
+                            SyncFuture.this.cancel();
+                        }
+                    });
         }
 
         /**
-         * @return Topology version.
+         * Dummy future. Will be done by real one.
          */
-        public AffinityTopologyVersion topologyVersion() {
-            return assigns != null ? assigns.topologyVersion() : null;
+        public SyncFuture() {
+            this.assigns = null;
+            this.cctx = null;
+            this.log = null;
+            this.sendStoppedEvnt = false;
         }
 
         /**
-         * @param assigns Assigns.
+         * @return Topology version.
          */
-        private void init(GridDhtPreloaderAssignments assigns) {
-            this.assigns = assigns;
-
-            cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
-                new CI1<IgniteInternalFuture<Long>>() {
-                    @Override public void apply(IgniteInternalFuture<Long> future) {
-                        SyncFuture.this.cancel();
-                    }
-                });
+        public AffinityTopologyVersion topologyVersion() {
+            return assigns != null ? assigns.topologyVersion() : null;
         }
 
         /**
-         * @return Initialised or not.
+         * @return Is dummy (created at demander creation).
          */
-        private boolean isInited() {
-            return assigns != null;
+        private boolean isDummy() {
+            return assigns == null;
         }
 
         /**
@@ -845,6 +830,7 @@ public class GridDhtPartitionDemander {
 
         /**
          * @param node Node.
+         * @return Demand message.
          */
         private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
             if (isDone())
@@ -854,6 +840,36 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * @return future is done.
+         */
+        private boolean doneIfEmpty() {
+            lock.lock();
+
+            try {
+                if (isDone())
+                    return true;
+
+                if (assigns.isEmpty()) {
+                    assert remaining.isEmpty();
+
+                    if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
+                        U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
+                            ", topology=" + assigns.topologyVersion() + "]");
+
+                    checkIsDone();
+
+                    return true;
+                }
+                else {
+                    return false;
+                }
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
          * Cancels this future.
          *
          * @return {@code true}.
@@ -867,8 +883,6 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                completed = false;
-
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
                     + ", topology=" + topologyVersion());
 
@@ -892,13 +906,11 @@ public class GridDhtPartitionDemander {
                     return;
 
                 U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
-                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
                 remaining.remove(nodeId);
 
-                completed = false;
-
                 checkIsDone();
             }
             finally {
@@ -919,7 +931,7 @@ public class GridDhtPartitionDemander {
                     return;
 
                 if (missed.get(nodeId) == null)
-                    missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+                    missed.put(nodeId, new HashSet<Integer>());
 
                 missed.get(nodeId).add(p);
             }
@@ -950,7 +962,7 @@ public class GridDhtPartitionDemander {
 
                     if (parts.isEmpty()) {
                         U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
-                            ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                            ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                             ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
                         remaining.remove(nodeId);
@@ -1007,10 +1019,10 @@ public class GridDhtPartitionDemander {
 
                 cctx.shared().exchange().scheduleResendPartitions();
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStopEvnt))
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
 
-                onDone(completed);
+                onDone();
             }
         }
     }