You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/03 01:07:05 UTC

[14/34] incubator-ignite git commit: IGNITE-621 - Fixing remap logic.

IGNITE-621 - Fixing remap logic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2c90b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2c90b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2c90b52

Branch: refs/heads/ignite-1026
Commit: c2c90b52972bec53919d97ec07d2aeab4d0d55e8
Parents: a9d0662
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 25 17:06:31 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 25 17:06:31 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  3 +
 .../processors/cache/GridCacheAdapter.java      |  2 +-
 .../processors/cache/GridCacheAtomicFuture.java | 12 ++-
 .../processors/cache/GridCacheMvccManager.java  |  8 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 12 ++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 88 ++++++++++++++++++--
 .../communication/tcp/TcpCommunicationSpi.java  |  2 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  7 +-
 8 files changed, 110 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 542fa30..40fc873 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -343,6 +343,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for affinity assignment history. */
     public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
 
+    /** Number of cache operation retries in case of topology exceptions. */
+    public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index f993527..e138520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -79,7 +79,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
     /** Maximum number of retries when topology changes. */
-    public static final int MAX_RETRIES = 100;
+    public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
 
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 35d3ec5..8724d3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.affinity.*;
 
 import java.util.*;
@@ -26,14 +27,17 @@ import java.util.*;
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
-     * @return {@code True} if partition exchange should wait for this future to complete.
+     * @return Future topology version.
      */
-    public boolean waitForPartitionExchange();
+    public AffinityTopologyVersion topologyVersion();
 
     /**
-     * @return Future topology version.
+     * Gets future that will be completed when it is safe when update is finished on the given version of topology.
+     *
+     * @param topVer Topology version to finish.
+     * @return Future or {@code null} if no need to wait.
      */
-    public AffinityTopologyVersion topologyVersion();
+    public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
 
     /**
      * @return Future keys.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c528e08..f24cf01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -338,7 +338,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
         IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
 
-        assert old == null;
+        assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
     }
 
     /**
@@ -1002,8 +1002,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
 
         for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
-            if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0)
-                res.add((IgniteInternalFuture<Object>)fut);
+            IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
+
+            if (complete != null)
+                res.add((IgniteInternalFuture)complete);
         }
 
         res.markInitialized();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ff8454e..37b57e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -170,13 +171,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
-    @Override public boolean waitForPartitionExchange() {
-        return waitForExchange;
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return updateReq.topologyVersion();
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return updateReq.topologyVersion();
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForExchange && topologyVersion().compareTo(topVer) < 0)
+            return this;
+
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 536eb40..ea9b335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -105,7 +105,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     private final ExpiryPolicy expiryPlc;
 
     /** Future map topology version. */
-    private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+    /** Completion future for a particular topology version. */
+    private GridFutureAdapter<Void> topCompleteFut;
 
     /** Optional filter. */
     private final CacheEntryPredicate[] filter;
@@ -246,8 +249,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean waitForPartitionExchange() {
+    /**
+     * @return {@code True} if this future should block partition map exchange.
+     */
+    private boolean waitForPartitionExchange() {
         // Wait fast-map near atomic update futures in CLOCK mode.
         return fastMap;
     }
@@ -323,13 +328,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         else {
             topLocked = true;
 
-            // Cannot remap.
-            remapCnt.set(1);
+            synchronized (this) {
+                this.topVer = topVer;
+
+                // Cannot remap.
+                remapCnt.set(1);
+            }
 
             map0(topVer, null, false, null);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) {
+            synchronized (this) {
+                if (this.topVer == AffinityTopologyVersion.ZERO)
+                    return null;
+
+                if (this.topVer.compareTo(topVer) < 0) {
+                    if (topCompleteFut == null)
+                        topCompleteFut = new GridFutureAdapter<>();
+
+                    return topCompleteFut;
+                }
+            }
+        }
+
+        return null;
+    }
+
     /**
      * @param failed Keys to remap.
      */
@@ -339,14 +367,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         Collection<Object> remapKeys = new ArrayList<>(failed.size());
         Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null;
+        Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
+        Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null;
 
         Iterator<?> keyIt = keys.iterator();
         Iterator<?> valsIt = vals != null ? vals.iterator() : null;
+        Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null;
+        Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null;
 
         for (Object key : failed) {
             while (keyIt.hasNext()) {
                 Object nextKey = keyIt.next();
                 Object nextVal = valsIt != null ? valsIt.next() : null;
+                GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null;
+                GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null;
 
                 if (F.eq(key, nextKey)) {
                     remapKeys.add(nextKey);
@@ -354,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     if (remapVals != null)
                         remapVals.add(nextVal);
 
+                    if (remapConflictPutVals != null)
+                        remapConflictPutVals.add(nextConflictPutVal);
+
+                    if (remapConflictRmvVals != null)
+                        remapConflictRmvVals.add(nextConflictRmvVal);
+
                     break;
                 }
             }
@@ -361,13 +401,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         keys = remapKeys;
         vals = remapVals;
+        conflictPutVals = remapConflictPutVals;
+        conflictRmvVals = remapConflictRmvVals;
 
-        mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
         single = null;
         futVer = null;
         err = null;
         opRes = null;
-        topVer = AffinityTopologyVersion.ZERO;
+
+        GridFutureAdapter<Void> fut0;
+
+        synchronized (this) {
+            mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+
+            topVer = AffinityTopologyVersion.ZERO;
+
+            fut0 = topCompleteFut;
+
+            topCompleteFut = null;
+        }
+
+        if (fut0 != null)
+            fut0.onDone();
+
         singleNodeId = null;
         singleReq = null;
         fastMapRemap = false;
@@ -405,6 +461,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (futVer != null)
                 cctx.mvcc().removeAtomicFuture(version());
 
+            GridFutureAdapter<Void> fut0;
+
+            synchronized (this) {
+                fut0 = topCompleteFut;
+            }
+
+            if (fut0 != null)
+                fut0.onDone();
+
             return true;
         }
 
@@ -544,6 +609,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 return;
             }
+
+            synchronized (this) {
+                this.topVer = topVer;
+            }
         }
         finally {
             cache.topology().readUnlock();
@@ -559,7 +628,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         boolean remap = false;
 
         synchronized (this) {
-            if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+            if (topVer != AffinityTopologyVersion.ZERO &&
+                ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) {
                 CachePartialUpdateCheckedException err0 = err;
 
                 if (err0 != null)
@@ -1040,7 +1110,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (err0 == null)
             err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
 
-        List<Object> keys = new ArrayList<>(failedKeys.size());
+        Collection<Object> keys = new ArrayList<>(failedKeys.size());
 
         for (KeyCacheObject key : failedKeys)
             keys.add(key.value(cctx.cacheObjectContext(), false));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..4ca2995 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -210,7 +210,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_ACK_SND_THRESHOLD = 16;
 
     /** Default socket write timeout. */
-    public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
+    public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 054a110..b255558 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -41,6 +41,7 @@ import org.jsr166.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
@@ -236,6 +237,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
 
             System.err.println("FINISHED PUTS");
 
+            GridCacheMapEntry.debug = true;
+
             // Start put threads.
             IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
                 @Override public Object call() throws Exception {
@@ -340,12 +343,12 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
                         }
                         catch (AssertionError e) {
                             if (r == 9) {
-                                System.err.println("Failed to verify cache contents: " + e.getMessage());
+                                info("Failed to verify cache contents: " + e.getMessage());
 
                                 throw e;
                             }
 
-                            System.err.println("Failed to verify cache contents, will retry: " + e.getMessage());
+                            info("Failed to verify cache contents, will retry: " + e.getMessage());
 
                             // Give some time to finish async updates.
                             U.sleep(1000);