You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:40 UTC

[13/16] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93caa0b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93caa0b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93caa0b8

Branch: refs/heads/ignite-1093-2
Commit: 93caa0b8bb9eb4f5f667d0533c15ce7a9efcdc17
Parents: 2cb397a
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 21:21:17 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 21:21:17 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 11 ++-
 .../dht/preloader/GridDhtPartitionSupplier.java | 99 +++++++++-----------
 2 files changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 596ec2f..a2f8c01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -827,7 +827,7 @@ public class GridDhtPartitionDemander {
             lock.lock();
 
             try {
-                remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts));
+                remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts));
             }
             finally {
                 lock.unlock();
@@ -949,7 +949,8 @@ public class GridDhtPartitionDemander {
                     parts.remove(p);
 
                     if (parts.isEmpty()) {
-                        U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+                        U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+                            "rebalancing [cache=" + cctx.name() +
                             ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                             ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
@@ -988,6 +989,9 @@ public class GridDhtPartitionDemander {
          */
         private void checkIsDone() {
             if (remaining.isEmpty()) {
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
                 if (log.isDebugEnabled())
                     log.debug("Completed sync future.");
 
@@ -1006,9 +1010,6 @@ public class GridDhtPartitionDemander {
 
                 cctx.shared().exchange().scheduleResendPartitions();
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
-                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
                 onDone();
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 50d64f9..1d8572a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,7 +24,6 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,12 +40,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T4;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -72,7 +70,7 @@ class GridDhtPartitionSupplier {
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /** Supply context map. */
-    private final ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+    private final ConcurrentHashMap8<T4, SupplyContext> scMap = new ConcurrentHashMap8<>();
 
     /** Rebalancing listener. */
     private GridLocalEventListener lsnr;
@@ -98,19 +96,8 @@ class GridDhtPartitionSupplier {
     void start() {
         lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
-                if (evt instanceof CacheRebalancingEvent) {
-                    ClusterNode node = ((CacheRebalancingEvent)evt).discoveryNode();
-
-                    int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
-                    for (int idx = 0; idx < lsnrCnt; idx++) {
-                        T2<UUID, Integer> scId = new T2<>(node.id(), idx);
-
-                        tryClearContext(scMap, scId, log);
-                    }
-                }
-                else if (evt instanceof DiscoveryEvent) {
-                    scMap.clear();
+                if (evt instanceof DiscoveryEvent) {
+                    clearContexts(scMap, log, cctx);
                 }
                 else {
                     assert false;
@@ -118,7 +105,9 @@ class GridDhtPartitionSupplier {
             }
         };
 
-        cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+        //todo: rebalance stopped.
+
+        cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         startOldListeners();
     }
@@ -133,19 +122,33 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * Clear context by id.
+     * Clear contexts.
      *
      * @param map Context map.
-     * @param scId Context id.
      * @param log Logger.
+     * @param cctx Context.
      */
-    private static void tryClearContext(
-        ConcurrentHashMap8<T2, SupplyContext> map,
-        T2<UUID, Integer> scId,
-        IgniteLogger log) {
-        SupplyContext sc = map.get(scId);
+    private static void clearContexts(
+        ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) {
+        for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) {
+            clearContext(map, entry.getKey(), entry.getValue(), log, cctx);
+        }
+    }
 
-        if (sc != null) {
+    /**
+     * Clear context.
+     *
+     * @param map Context map.
+     * @param log Logger.
+     */
+    private static boolean clearContext(
+        ConcurrentHashMap8<T4, SupplyContext> map,
+        T4 t,
+        SupplyContext sc,
+        IgniteLogger log,
+        GridCacheContext<?, ?> cctx) {
+
+        if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) {
             Iterator it = sc.entryIt;
 
             if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
@@ -159,9 +162,11 @@ class GridDhtPartitionSupplier {
                     log.error("Iterator close failed.", e);
                 }
             }
+
+            return map.remove(t, sc);
         }
 
-        map.remove(scId, sc);
+        return false;
     }
 
     /**
@@ -190,21 +195,16 @@ class GridDhtPartitionSupplier {
 
         ClusterNode node = cctx.discovery().node(id);
 
-        T2<UUID, Integer> scId = new T2<>(id, idx);
+        T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence());
 
         try {
             SupplyContext sctx = scMap.get(scId);
 
-            if (sctx == null) {
-                if (d.partitions().isEmpty())
-                    return;
-            }
-            else {
-                if (!sctx.top.equals(d.topologyVersion())) {
-                    tryClearContext(scMap, scId, log);
+            if (sctx == null && d.partitions().isEmpty())
+                return;
 
-                    sctx = scMap.get(scId);
-                }
+            if (sctx != null && !d.partitions().isEmpty()) {
+                assert false;
             }
 
             long bCnt = 0;
@@ -277,7 +277,7 @@ class GridDhtPartitionSupplier {
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
                                 if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr, d.topologyVersion());
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
 
                                     swapLsnr = null;
 
@@ -321,8 +321,7 @@ class GridDhtPartitionSupplier {
                                 partIt,
                                 null,
                                 swapLsnr,
-                                part,
-                                d.topologyVersion());
+                                part);
                         }
                     }
 
@@ -353,7 +352,7 @@ class GridDhtPartitionSupplier {
 
                                 if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
                                     if (++bCnt >= maxBatchesCnt) {
-                                        saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr, d.topologyVersion());
+                                        saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
 
                                         swapLsnr = null;
 
@@ -436,8 +435,7 @@ class GridDhtPartitionSupplier {
                                 partIt,
                                 null,
                                 null,
-                                part,
-                                d.topologyVersion());
+                                part);
                         }
                     }
 
@@ -465,7 +463,7 @@ class GridDhtPartitionSupplier {
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
                                 if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr, d.topologyVersion());
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
 
                                     swapLsnr = null;
 
@@ -557,13 +555,12 @@ class GridDhtPartitionSupplier {
      * @param swapLsnr Swap listener.
      */
     private void saveSupplyContext(
-        T2 t,
+        T4 t,
         int phase,
         Iterator<Integer> partIt,
         int part,
-        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
-        AffinityTopologyVersion top) {
-        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part, top));
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
     }
 
     /**
@@ -585,9 +582,6 @@ class GridDhtPartitionSupplier {
         /** Partition. */
         private final int part;
 
-        /** Topology version. */
-        private final AffinityTopologyVersion top;
-
         /**
          * @param phase Phase.
          * @param partIt Partition iterator.
@@ -596,13 +590,12 @@ class GridDhtPartitionSupplier {
          * @param part Partition.
          */
         public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
-            GridCacheEntryInfoCollectSwapListener swapLsnr, int part, AffinityTopologyVersion top) {
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
             this.phase = phase;
             this.partIt = partIt;
             this.entryIt = entryIt;
             this.swapLsnr = swapLsnr;
             this.part = part;
-            this.top = top;
         }
     }