You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/11 12:54:27 UTC

ignite git commit: Must use finished exchange future to call validateCache.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3477 3f4a2ee58 -> 949e4c0ac


Must use finished exchange future to call validateCache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/949e4c0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/949e4c0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/949e4c0a

Branch: refs/heads/ignite-3477
Commit: 949e4c0ac6a9ebea877bbc336e95f5a8a5cc41f9
Parents: 3f4a2ee
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 11 15:54:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 11 15:54:30 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  7 +++--
 .../GridCachePartitionExchangeManager.java      | 33 +++++++++++++++++---
 .../dht/GridPartitionedGetFuture.java           |  4 ++-
 .../dht/GridPartitionedSingleGetFuture.java     |  4 ++-
 .../GridDhtPartitionsExchangeFuture.java        | 30 ++++++++++--------
 5 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 93270ea..9356f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,7 +86,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -1967,8 +1967,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             try {
                 int keysSize = keys.size();
 
-                Throwable ex = ctx.shared().exchange().lastTopologyFuture()
-                    .validateCache(ctx, recovery, /*read*/true, null, keys);
+                GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture();
+
+                Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null;
 
                 if (ex != null)
                     return new GridFinishedFuture<>(ex);

http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1e7689f..46a20ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -145,6 +145,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
 
     /** */
+    private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>();
+
+    /** */
     private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
 
     /** */
@@ -159,9 +162,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private GridFutureAdapter<?> reconnectExchangeFut;
 
     /** */
-    private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>();
-
-    /** */
     private final Object interruptLock = new Object();
 
     /**
@@ -610,13 +610,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @return Last completed topology future.
+     * @return Last initialized topology future.
      */
     public GridDhtTopologyFuture lastTopologyFuture() {
         return lastInitializedFut;
     }
 
     /**
+     * @return Last finished topology future.
+     */
+    @Nullable public GridDhtTopologyFuture lastFinishedFuture() {
+        return lastFinishedFut.get();
+    }
+
+    /**
+     * @param fut Finished future.
+     */
+    public void lastFinishedFuture(GridDhtTopologyFuture fut) {
+        assert fut != null && fut.isDone() : fut;
+
+        while (true) {
+            GridDhtTopologyFuture cur = lastFinishedFut.get();
+
+            if (cur == null || fut.topologyVersion().compareTo(cur.topologyVersion()) > 0) {
+                if (lastFinishedFut.compareAndSet(cur, fut))
+                    break;
+            }
+            else
+                break;
+        }
+    }
+
+    /**
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index cf329ef..6c4da68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -246,7 +246,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             return;
         }
 
-        Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, null, keys);
+        GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+        Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null;
 
         if (err != null) {
             onDone(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 1744cbd..ea69743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -198,7 +198,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
         AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
             canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
 
-        Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, key, null);
+        GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture();
+
+        Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, key, null) : null;
 
         if (err != null) {
             onDone(err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b7f6680..75fd3e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -205,7 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private boolean centralizedAff;
 
     /** Change global state exception. */
-    private Exception changeGlobalStateException;
+    private Exception changeGlobalStateE;
 
     /** Change global state exceptions. */
     private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap8<>();
@@ -488,9 +488,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert !dummy && !forcePreload : this;
 
         try {
-            AffinityTopologyVersion topVersion = topologyVersion();
+            AffinityTopologyVersion topVer = topologyVersion();
 
-            srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVersion));
+            srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer));
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
 
@@ -522,7 +522,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             else {
                 if (discoEvt.type() == EVT_NODE_JOINED) {
-                    Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVersion);
+                    Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
 
                     if (!discoEvt.eventNode().isLocal())
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
@@ -568,7 +568,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 case NONE: {
                     initTopologies();
 
-                    onDone(topVersion);
+                    onDone(topVer);
 
                     break;
                 }
@@ -654,10 +654,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         GridClusterStateProcessor stateProc = cctx.kernalContext().state();
 
         if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
-            changeGlobalStateException = stateProc.onChangeGlobalState();
+            changeGlobalStateE = stateProc.onChangeGlobalState();
 
-            if (crd && changeGlobalStateException != null)
-                changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateException);
+            if (crd && changeGlobalStateE != null)
+                changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
         }
 
         boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
@@ -1044,8 +1044,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
             node, exchangeId(), clientOnlyExchange, true);
 
-        if (exchangeOnChangeGlobalState && changeGlobalStateException != null)
-            m.setException(changeGlobalStateException);
+        if (exchangeOnChangeGlobalState && changeGlobalStateE != null)
+            m.setException(changeGlobalStateE);
 
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1199,6 +1199,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (discoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)discoEvt).customMessage(null);
 
+            cctx.exchange().lastFinishedFuture(this);
+
             return true;
         }
 
@@ -1213,6 +1215,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         @Nullable Object key,
         @Nullable Collection<?> keys
     ) {
+        assert isDone() : this;
+
         Throwable err = error();
 
         if (err != null)
@@ -1318,7 +1322,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         changeGlobalStateExceptions.clear();
         crd = null;
         partReleaseFut = null;
-        changeGlobalStateException = null;
+        changeGlobalStateE = null;
     }
 
     /**
@@ -1920,8 +1924,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                         }
 
                         if (crd0.isLocal()) {
-                            if (exchangeOnChangeGlobalState && changeGlobalStateException!=null)
-                                changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateException);
+                            if (exchangeOnChangeGlobalState && changeGlobalStateE !=null)
+                                changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
 
                             if (allReceived) {
                                 onAllReceived();