You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/02 12:58:58 UTC

[44/46] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9abfc606
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9abfc606
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9abfc606

Branch: refs/heads/ignite-1093-2
Commit: 9abfc60694ddb2cc4dc11cd41124c79c09d083dd
Parents: e1651dd
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 28 14:39:52 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 28 14:39:52 2015 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionDemandMessage.java          |  7 ++
 .../dht/preloader/GridDhtPartitionDemander.java | 68 ++++++++++++--------
 .../dht/preloader/GridDhtPartitionSupplier.java |  7 +-
 3 files changed, 53 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 06ac54b..6c60930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -123,6 +123,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     }
 
     /**
+     * @param updateSeq Update sequence.
+     */
+    void updateSequence(long updateSeq) {
+        this.updateSeq = updateSeq;
+    }
+
+    /**
      * @return Update sequence.
      */
     long updateSequence() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 345e3bd..5d4db40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -64,7 +64,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -110,6 +110,9 @@ public class GridDhtPartitionDemander {
     /** Demand lock. */
     private final ReadWriteLock demandLock;
 
+    /** Rebalancing iteration counter. */
+    private long updateSeq = 0;
+
     /**
      * @param cctx Cctx.
      * @param demandLock Demand lock.
@@ -194,7 +197,10 @@ public class GridDhtPartitionDemander {
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(SyncFuture fut) {
-        return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut;
+        return
+            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+            fut != syncFut || // Same topology, but dummy exchange forced because of missing partitions.
+            cctx.shared().exchange().hasPendingExchange(); // New topology pending.
     }
 
     /**
@@ -256,20 +262,20 @@ public class GridDhtPartitionDemander {
 
             final SyncFuture oldFut = syncFut;
 
-            if (!oldFut.isDummy() && assigns.topologyVersion().compareTo(oldFut.topologyVersion()) < 0) {
-                U.log(log, "Skipping obsolete (dummy) exchange. [top=" + assigns.topologyVersion() + "]");
+            if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology.
+                U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]");
 
                 return;
             }
 
-            final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
+            final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq);
 
             if (!oldFut.isDummy())
                 oldFut.cancel();
             else
-                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> future) {
-                        oldFut.onDone();
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> future) {
+                        oldFut.onDone(fut.result());
                     }
                 });
 
@@ -385,13 +391,13 @@ public class GridDhtPartitionDemander {
 
             final CacheConfiguration cfg = cctx.config();
 
-            U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
-                ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]");
-
             //Check remote node rebalancing API version.
             if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
-                fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
+                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                    ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+                fut.appendPartitions(node.id(), d.partitions());
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -415,6 +421,7 @@ public class GridDhtPartitionDemander {
                         GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
                         initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+                        initD.updateSequence(fut.updateSeq);
 
                         try {
                             if (!topologyChanged(fut)) {
@@ -442,9 +449,13 @@ public class GridDhtPartitionDemander {
                 }
             }
             else {
+                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                    ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]");
+
                 DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
+                fut.appendPartitions(node.id(), d.partitions());
 
                 dw.run(node, d);
             }
@@ -516,7 +527,9 @@ public class GridDhtPartitionDemander {
 
         assert node != null;
 
-        if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut) || !fut.isActual(id, supply.updateSequence()))
+        if (!fut.topologyVersion().equals(topVer) || // Current future based on another topology.
+            topologyChanged(fut) || // Topology already changed (for current future) or new topology pending.
+            !fut.isActual(supply.updateSequence())) // Current future have same topology, but another update sequence.
             return;
 
         if (log.isDebugEnabled())
@@ -761,7 +774,7 @@ public class GridDhtPartitionDemander {
         private final IgniteLogger log;
 
         /** Remaining. T3: startTime, partitions, updateSequence */
-        private final Map<UUID, T3<Long, Collection<Integer>, Long>> remaining = new HashMap<>();
+        private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
 
         /** Missed. */
         private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
@@ -776,6 +789,9 @@ public class GridDhtPartitionDemander {
         /** Topology version. */
         private final AffinityTopologyVersion topVer;
 
+        /** Unique (per demander) sequence id. */
+        private final long updateSeq;
+
         /**
          * @param assigns Assigns.
          * @param cctx Context.
@@ -785,7 +801,8 @@ public class GridDhtPartitionDemander {
         SyncFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
             IgniteLogger log,
-            boolean sentStopEvnt) {
+            boolean sentStopEvnt,
+            long updateSeq) {
             assert assigns != null;
 
             this.exchFut = assigns.exchangeFuture();
@@ -793,6 +810,7 @@ public class GridDhtPartitionDemander {
             this.cctx = cctx;
             this.log = log;
             this.sendStoppedEvnt = sentStopEvnt;
+            this.updateSeq = updateSeq;
 
             if (assigns != null)
                 cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
@@ -812,6 +830,7 @@ public class GridDhtPartitionDemander {
             this.cctx = null;
             this.log = null;
             this.sendStoppedEvnt = false;
+            this.updateSeq = -1;
         }
 
         /**
@@ -822,14 +841,11 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param nodeId Node id.
          * @param updateSeq Update sequence.
          * @return true in case future created for specified updateSeq, false in other case.
          */
-        private boolean isActual(UUID nodeId, long updateSeq) {
-            T3<Long, Collection<Integer>, Long> t = remaining.get(nodeId);
-
-            return t != null ? t.get3().equals(updateSeq) : false;
+        private boolean isActual(long updateSeq) {
+            return this.updateSeq == updateSeq;
         }
 
         /**
@@ -843,11 +859,11 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param parts Parts.
          */
-        private void appendPartitions(UUID nodeId, Collection<Integer> parts, long updateSeq) {
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
             lock.lock();
 
             try {
-                remaining.put(nodeId, new T3<>(U.currentTimeMillis(), parts, updateSeq));
+                remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
             }
             finally {
                 lock.unlock();
@@ -1027,10 +1043,10 @@ public class GridDhtPartitionDemander {
                     if (!m.isEmpty()) {
                         U.log(log, ("Reassigning partitions that were missed: " + m));
 
-                        cctx.shared().exchange().forceDummyExchange(true, exchFut);
-
                         onDone(false); //Finished but has missed partitions and forced dummy exchange
 
+                        cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
                         return;
                     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index e23a50b..81e2fa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -105,7 +105,7 @@ class GridDhtPartitionSupplier {
 
                         SupplyContext sc = entry.getValue();
 
-                       if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
+                        if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
                             clearContext(scMap, t, sc, log);
                     }
                 }
@@ -128,7 +128,7 @@ class GridDhtPartitionSupplier {
             }
         };
 
-        cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+        cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
 
         startOldListeners();
     }
@@ -532,7 +532,8 @@ class GridDhtPartitionSupplier {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-            if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()) || // Topology already changed.
+                cctx.shared().exchange().hasPendingExchange()) // New topology pending.
                 return true;
 
             cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());