You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:38 UTC

[11/16] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e0eafed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e0eafed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e0eafed

Branch: refs/heads/ignite-1093-2
Commit: 9e0eafed477a3e09e4527f436be3de57d59c9848
Parents: 8c1aa26
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 16:52:38 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 16:52:38 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 126 +++++++++----------
 .../dht/preloader/GridDhtPartitionSupplier.java |   8 +-
 .../GridDhtPartitionSupplyMessageV2.java        |  70 +++++------
 3 files changed, 89 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 498b16d..596ec2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
@@ -189,11 +190,11 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param topVer Topology version.
+     * @param fut Future.
      * @return {@code True} if topology changed.
      */
-    private boolean topologyChanged(AffinityTopologyVersion topVer) {
-        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+    private boolean topologyChanged(SyncFuture fut) {
+        return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut;
     }
 
     /**
@@ -218,7 +219,7 @@ public class GridDhtPartitionDemander {
         try {
             SyncFuture wFut = (SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
 
-            if (!topologyChanged(fut.assigns.topologyVersion()))
+            if (!topologyChanged(fut))
                 wFut.get();
             else {
                 fut.cancel();
@@ -257,8 +258,6 @@ public class GridDhtPartitionDemander {
 
             final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
 
-            syncFut = fut;
-
             if (!oldFut.isDummy())
                 oldFut.cancel();
             else
@@ -268,10 +267,15 @@ public class GridDhtPartitionDemander {
                     }
                 });
 
-            if (fut.doneIfEmpty())// Done in case empty assigns.
+            syncFut = fut;
+
+            if (assigns.isEmpty()) {
+                fut.doneIfEmpty();
+
                 return;
+            }
 
-            if (topologyChanged(fut.topologyVersion())) {
+            if (topologyChanged(fut)) {
                 fut.cancel();
 
                 return;
@@ -298,7 +302,7 @@ public class GridDhtPartitionDemander {
                                     log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
                                         ", rebalanceOrder=" + rebalanceOrder + ']');
 
-                                if (!topologyChanged(fut.topologyVersion()))
+                                if (!topologyChanged(fut))
                                     oFut.get();
                                 else {
                                     fut.cancel();
@@ -323,7 +327,7 @@ public class GridDhtPartitionDemander {
                         }
                     }
 
-                    requestPartitions(fut);
+                    requestPartitions(fut, assigns);
                 }
             });
 
@@ -358,13 +362,9 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Future.
      */
-    private void requestPartitions(SyncFuture fut) {
-        final GridDhtPreloaderAssignments assigns = fut.assigns;
-
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
+    private void requestPartitions(SyncFuture fut, GridDhtPreloaderAssignments assigns) {
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
-            if (topologyChanged(topVer)) {
+            if (topologyChanged(fut)) {
                 fut.cancel();
 
                 return;
@@ -411,7 +411,7 @@ public class GridDhtPartitionDemander {
                         initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
 
                         try {
-                            if (!topologyChanged(topVer))
+                            if (!topologyChanged(fut))
                                 cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
                             else
                                 fut.cancel();
@@ -498,16 +498,12 @@ public class GridDhtPartitionDemander {
 
         final SyncFuture fut = syncFut;
 
-        if (!fut.topologyVersion().equals(topVer))//will check topology changed at loop.
-            return;
-
         ClusterNode node = cctx.node(id);
 
-        if (node == null) {
-            fut.cancel(id);
+        assert node != null;
 
+        if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut))
             return;
-        }
 
         if (log.isDebugEnabled())
             log.debug("Received supply message: " + supply);
@@ -527,7 +523,7 @@ public class GridDhtPartitionDemander {
         try {
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                if (topologyChanged(topVer)) {
+                if (topologyChanged(fut)) {
                     fut.cancel();
 
                     return;
@@ -609,7 +605,10 @@ public class GridDhtPartitionDemander {
             for (Integer miss : supply.missed())
                 fut.partitionDone(id, miss);
 
-            GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+            GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+            d.timeout(cctx.config().getRebalanceTimeout());
 
             if (d != null) {
                 // Create copy.
@@ -618,7 +617,7 @@ public class GridDhtPartitionDemander {
 
                 nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                if (!topologyChanged(topVer)) {
+                if (!topologyChanged(fut)) {
                     // Send demand message.
                     cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
                         nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
@@ -761,8 +760,12 @@ public class GridDhtPartitionDemander {
         /** Lock. */
         private final Lock lock = new ReentrantLock();
 
-        /** Assignments. */
-        private final GridDhtPreloaderAssignments assigns;
+        /** Exchange future. */
+        @GridToStringExclude
+        private final GridDhtPartitionsExchangeFuture exchFut;
+
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
 
         /**
          * @param assigns Assigns.
@@ -774,7 +777,10 @@ public class GridDhtPartitionDemander {
             GridCacheContext<?, ?> cctx,
             IgniteLogger log,
             boolean sentStopEvnt) {
-            this.assigns = assigns;
+            assert assigns != null;
+
+            this.exchFut = assigns.exchangeFuture();
+            this.topVer = assigns.topologyVersion();
             this.cctx = cctx;
             this.log = log;
             this.sendStoppedEvnt = sentStopEvnt;
@@ -792,7 +798,8 @@ public class GridDhtPartitionDemander {
          * Dummy future. Will be done by real one.
          */
         public SyncFuture() {
-            this.assigns = null;
+            this.exchFut = null;
+            this.topVer = null;
             this.cctx = null;
             this.log = null;
             this.sendStoppedEvnt = false;
@@ -802,14 +809,14 @@ public class GridDhtPartitionDemander {
          * @return Topology version.
          */
         public AffinityTopologyVersion topologyVersion() {
-            return assigns != null ? assigns.topologyVersion() : null;
+            return topVer;
         }
 
         /**
          * @return Is dummy (created at demander creation).
          */
         private boolean isDummy() {
-            return assigns == null;
+            return topVer == null;
         }
 
         /**
@@ -828,41 +835,22 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param node Node.
-         * @return Demand message.
-         */
-        private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
-            if (isDone())
-                return null;
-
-            return assigns.get(node);
-        }
-
-        /**
-         * @return future is done.
+         *
          */
-        private boolean doneIfEmpty() {
+        private void doneIfEmpty() {
             lock.lock();
 
             try {
                 if (isDone())
-                    return true;
-
-                if (assigns.isEmpty()) {
-                    assert remaining.isEmpty();
+                    return;
 
-                    if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
-                        if (log.isDebugEnabled())
-                            log.debug("Rebalancing is not required [cache=" + cctx.name() +
-                            ", topology=" + assigns.topologyVersion() + "]");
+                assert remaining.isEmpty();
 
-                    checkIsDone();
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
+                        ", topology=" + topVer + "]");
 
-                    return true;
-                }
-                else {
-                    return false;
-                }
+                checkIsDone();
             }
             finally {
                 lock.unlock();
@@ -953,7 +941,7 @@ public class GridDhtPartitionDemander {
 
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
                     preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                        assigns.exchangeFuture().discoveryEvent());
+                        exchFut.discoveryEvent());
 
                 Collection<Integer> parts = remaining.get(nodeId).get2();
 
@@ -1011,15 +999,15 @@ public class GridDhtPartitionDemander {
                 }
 
                 if (!m.isEmpty()) {
-                    U.log(log,("Reassigning partitions that were missed: " + m));
+                    U.log(log, ("Reassigning partitions that were missed: " + m));
 
-                    cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
                 }
 
                 cctx.shared().exchange().scheduleResendPartitions();
 
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
-                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
 
                 onDone();
             }
@@ -1163,7 +1151,7 @@ public class GridDhtPartitionDemander {
             // Get the same collection that will be sent in the message.
             Collection<Integer> remaining = d.partitions();
 
-            if (topologyChanged(topVer))
+            if (topologyChanged(fut))
                 return missed;
 
             cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@@ -1195,7 +1183,7 @@ public class GridDhtPartitionDemander {
 
                     // While.
                     // =====
-                    while (!topologyChanged(topVer)) {
+                    while (!topologyChanged(fut)) {
                         SupplyMessage s = poll(msgQ, timeout);
 
                         // If timed out.
@@ -1358,7 +1346,7 @@ public class GridDhtPartitionDemander {
                         }
                     }
                 }
-                while (retry && !topologyChanged(topVer));
+                while (retry && !topologyChanged(fut));
 
                 return missed;
             }
@@ -1375,13 +1363,13 @@ public class GridDhtPartitionDemander {
             demandLock.readLock().lock();
 
             try {
-                GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+                GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
 
-                AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+                AffinityTopologyVersion topVer = fut.topVer;
 
                 Collection<Integer> missed = new HashSet<>();
 
-                if (topologyChanged(topVer)) {
+                if (topologyChanged(fut)) {
                     fut.cancel();
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index ee01158..0641612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -185,7 +185,7 @@ class GridDhtPartitionSupplier {
         if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
-        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
             d.updateSequence(), cctx.cacheId(), d.topologyVersion());
 
         ClusterNode node = cctx.discovery().node(id);
@@ -289,7 +289,7 @@ class GridDhtPartitionSupplier {
                                     if (!reply(node, d, s))
                                         return;
 
-                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
                                         cctx.cacheId(), d.topologyVersion());
                                 }
                             }
@@ -365,7 +365,7 @@ class GridDhtPartitionSupplier {
                                         if (!reply(node, d, s))
                                             return;
 
-                                        s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
                                             cctx.cacheId(), d.topologyVersion());
                                     }
                                 }
@@ -477,7 +477,7 @@ class GridDhtPartitionSupplier {
                                     if (!reply(node, d, s))
                                         return;
 
-                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
                                         cctx.cacheId(), d.topologyVersion());
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e0eafed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 01056ac..17ebb26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -17,17 +17,30 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  * Partition supply message.
@@ -36,9 +49,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Worker ID. */
-    private int workerId = -1;
-
     /** Update sequence. */
     private long updateSeq;
 
@@ -66,17 +76,14 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
     private int msgSize;
 
     /**
-     * @param workerId Worker ID.
      * @param updateSeq Update sequence for this node.
      * @param cacheId Cache ID.
      */
-    GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
-        assert workerId >= 0;
+    GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
         assert updateSeq > 0;
 
         this.cacheId = cacheId;
         this.updateSeq = updateSeq;
-        this.workerId = workerId;
         this.topVer = topVer;
     }
 
@@ -98,13 +105,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
     }
 
     /**
-     * @return Worker ID.
-     */
-    int workerId() {
-        return workerId;
-    }
-
-    /**
      * @return Update sequence.
      */
     long updateSequence() {
@@ -255,7 +255,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
         GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
 
         for (CacheEntryInfoCollection col : infos().values()) {
-            List<GridCacheEntryInfo>  entries = col.infos();
+            List<GridCacheEntryInfo> entries = col.infos();
 
             for (int i = 0; i < entries.size(); i++)
                 entries.get(i).unmarshal(cacheCtx, ldr);
@@ -320,12 +320,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
 
                 writer.incrementState();
 
-            case 9:
-                if (!writer.writeInt("workerId", workerId))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -390,17 +384,9 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
 
                 reader.incrementState();
 
-            case 9:
-                workerId = reader.readInt("workerId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
-        return true;
+        return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
     }
 
     /** {@inheritDoc} */
@@ -410,7 +396,7 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 9;
     }
 
     /** {@inheritDoc} */