You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/03 11:19:58 UTC
[14/26] incubator-ignite git commit: IGNITE-621 - Fixing remap logic.
IGNITE-621 - Fixing remap logic.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2c90b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2c90b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2c90b52
Branch: refs/heads/ignite-gg-10460
Commit: c2c90b52972bec53919d97ec07d2aeab4d0d55e8
Parents: a9d0662
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 25 17:06:31 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 25 17:06:31 2015 -0700
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../processors/cache/GridCacheAdapter.java | 2 +-
.../processors/cache/GridCacheAtomicFuture.java | 12 ++-
.../processors/cache/GridCacheMvccManager.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 ++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 88 ++++++++++++++++++--
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 7 +-
8 files changed, 110 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 542fa30..40fc873 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -343,6 +343,9 @@ public final class IgniteSystemProperties {
/** Maximum size for affinity assignment history. */
public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
+ /** Number of cache operation retries in case of topology exceptions. */
+ public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index f993527..e138520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -79,7 +79,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
/** Maximum number of retries when topology changes. */
- public static final int MAX_RETRIES = 100;
+ public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 35d3ec5..8724d3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.affinity.*;
import java.util.*;
@@ -26,14 +27,17 @@ import java.util.*;
*/
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
- * @return {@code True} if partition exchange should wait for this future to complete.
+ * @return Future topology version.
*/
- public boolean waitForPartitionExchange();
+ public AffinityTopologyVersion topologyVersion();
/**
- * @return Future topology version.
+ * Gets future that will be completed when it is safe when update is finished on the given version of topology.
+ *
+ * @param topVer Topology version to finish.
+ * @return Future or {@code null} if no need to wait.
*/
- public AffinityTopologyVersion topologyVersion();
+ public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
/**
* @return Future keys.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c528e08..f24cf01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -338,7 +338,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
- assert old == null;
+ assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
}
/**
@@ -1002,8 +1002,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
- if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0)
- res.add((IgniteInternalFuture<Object>)fut);
+ IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
+
+ if (complete != null)
+ res.add((IgniteInternalFuture)complete);
}
res.markInitialized();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ff8454e..37b57e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
@@ -170,13 +171,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
/** {@inheritDoc} */
- @Override public boolean waitForPartitionExchange() {
- return waitForExchange;
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return updateReq.topologyVersion();
}
/** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return updateReq.topologyVersion();
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForExchange && topologyVersion().compareTo(topVer) < 0)
+ return this;
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 536eb40..ea9b335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -105,7 +105,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
private final ExpiryPolicy expiryPlc;
/** Future map topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
/** Optional filter. */
private final CacheEntryPredicate[] filter;
@@ -246,8 +249,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
}
- /** {@inheritDoc} */
- @Override public boolean waitForPartitionExchange() {
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
// Wait fast-map near atomic update futures in CLOCK mode.
return fastMap;
}
@@ -323,13 +328,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else {
topLocked = true;
- // Cannot remap.
- remapCnt.set(1);
+ synchronized (this) {
+ this.topVer = topVer;
+
+ // Cannot remap.
+ remapCnt.set(1);
+ }
map0(topVer, null, false, null);
}
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) {
+ synchronized (this) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
+
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
+
+ return topCompleteFut;
+ }
+ }
+ }
+
+ return null;
+ }
+
/**
* @param failed Keys to remap.
*/
@@ -339,14 +367,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
Collection<Object> remapKeys = new ArrayList<>(failed.size());
Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null;
+ Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
+ Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null;
Iterator<?> keyIt = keys.iterator();
Iterator<?> valsIt = vals != null ? vals.iterator() : null;
+ Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null;
+ Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null;
for (Object key : failed) {
while (keyIt.hasNext()) {
Object nextKey = keyIt.next();
Object nextVal = valsIt != null ? valsIt.next() : null;
+ GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null;
+ GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null;
if (F.eq(key, nextKey)) {
remapKeys.add(nextKey);
@@ -354,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (remapVals != null)
remapVals.add(nextVal);
+ if (remapConflictPutVals != null)
+ remapConflictPutVals.add(nextConflictPutVal);
+
+ if (remapConflictRmvVals != null)
+ remapConflictRmvVals.add(nextConflictRmvVal);
+
break;
}
}
@@ -361,13 +401,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
keys = remapKeys;
vals = remapVals;
+ conflictPutVals = remapConflictPutVals;
+ conflictRmvVals = remapConflictRmvVals;
- mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
single = null;
futVer = null;
err = null;
opRes = null;
- topVer = AffinityTopologyVersion.ZERO;
+
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+
+ topVer = AffinityTopologyVersion.ZERO;
+
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+ }
+
+ if (fut0 != null)
+ fut0.onDone();
+
singleNodeId = null;
singleReq = null;
fastMapRemap = false;
@@ -405,6 +461,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (futVer != null)
cctx.mvcc().removeAtomicFuture(version());
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ fut0 = topCompleteFut;
+ }
+
+ if (fut0 != null)
+ fut0.onDone();
+
return true;
}
@@ -544,6 +609,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
+
+ synchronized (this) {
+ this.topVer = topVer;
+ }
}
finally {
cache.topology().readUnlock();
@@ -559,7 +628,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
boolean remap = false;
synchronized (this) {
- if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+ if (topVer != AffinityTopologyVersion.ZERO &&
+ ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) {
CachePartialUpdateCheckedException err0 = err;
if (err0 != null)
@@ -1040,7 +1110,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (err0 == null)
err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- List<Object> keys = new ArrayList<>(failedKeys.size());
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
for (KeyCacheObject key : failedKeys)
keys.add(key.value(cctx.cacheObjectContext(), false));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..4ca2995 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -210,7 +210,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final int DFLT_ACK_SND_THRESHOLD = 16;
/** Default socket write timeout. */
- public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
+ public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 054a110..b255558 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -41,6 +41,7 @@ import org.jsr166.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheMode.*;
@@ -236,6 +237,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
System.err.println("FINISHED PUTS");
+ GridCacheMapEntry.debug = true;
+
// Start put threads.
IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -340,12 +343,12 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
}
catch (AssertionError e) {
if (r == 9) {
- System.err.println("Failed to verify cache contents: " + e.getMessage());
+ info("Failed to verify cache contents: " + e.getMessage());
throw e;
}
- System.err.println("Failed to verify cache contents, will retry: " + e.getMessage());
+ info("Failed to verify cache contents, will retry: " + e.getMessage());
// Give some time to finish async updates.
U.sleep(1000);