You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/11/20 15:39:09 UTC
ignite git commit: IGNITE-5195 DataStreamer can fails if non-data
node enter\leave the grid. This closes #3026.
Repository: ignite
Updated Branches:
refs/heads/master 43aa4a893 -> 5fb04be39
IGNITE-5195 DataStreamer can fails if non-data node enter\leave the grid. This closes #3026.
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fb04be3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fb04be3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fb04be3
Branch: refs/heads/master
Commit: 5fb04be3942d8a90288fdc75f86693872c918e2d
Parents: 43aa4a8
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Nov 20 18:37:44 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Nov 20 18:39:03 2017 +0300
----------------------------------------------------------------------
.../cache/GridCacheAffinityManager.java | 7 ++-
.../datastreamer/DataStreamerImpl.java | 20 ++++++-
.../cache/IgniteCacheDynamicStopSelfTest.java | 2 +-
.../datastreamer/DataStreamerImplSelfTest.java | 60 ++++++++++++++++++++
4 files changed, 85 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 14a1344..c9ee38c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -123,7 +123,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
if (cctx.isLocal())
topVer = LOC_CACHE_TOP_VER;
- return aff.assignments(topVer);
+ GridAffinityAssignmentCache aff0 = aff;
+
+ if (aff0 == null)
+ throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());
+
+ return aff0.assignments(topVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index d38132f..12eb2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -779,6 +779,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
else
topVer = ctx.cache().context().exchange().readyAffinityVersion();
+ List<List<ClusterNode>> assignments = cctx.affinity().assignments(topVer);
+
if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required.
gate = cctx.gate();
@@ -956,7 +958,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
final List<GridFutureAdapter<?>> futs;
try {
- futs = buf.update(entriesForNode, topVer, opFut, remap);
+ futs = buf.update(entriesForNode, topVer, assignments, opFut, remap);
opFut.markInitialized();
}
@@ -1411,6 +1413,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Nullable List<GridFutureAdapter<?>> update(
Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
+ List<List<ClusterNode>> assignments,
GridCompoundFuture opFut,
boolean remap
) throws IgniteInterruptedCheckedException {
@@ -1441,9 +1444,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
futs[b.partId] = curFut0;
}
- if (b.batchTopVer == null)
+ if (b.batchTopVer == null) {
+ b.batchTopVer = topVer;
+
+ b.assignments = assignments;
+ }
+
+ // topology changed, but affinity is the same, no re-map is required.
+ if (!topVer.equals(b.batchTopVer) && b.assignments.equals(assignments)) {
b.batchTopVer = topVer;
+ b.assignments = assignments;
+ }
+
curBatchTopVer = b.batchTopVer;
b.entries.add(entry);
@@ -2186,6 +2199,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC;
+ /** Batch assignments */
+ public List<List<ClusterNode>> assignments;
+
/**
* @param partId Partition ID.
* @param c Signal closure.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 5628c4d..44cd475 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -142,4 +142,4 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
ignite(0).destroyCache(DEFAULT_CACHE_NAME);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e90f6b0..940f8ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
@@ -38,6 +39,7 @@ import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -431,6 +433,64 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClientEventsNotCausingRemaps() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+ ((DataStreamerImpl)streamer).maxRemapCount(3);
+
+ streamer.addData(1, 1);
+
+ for (int topChanges = 0; topChanges < 30; topChanges++) {
+ IgniteEx node = startGrid(getConfiguration("flapping-client").setClientMode(true));
+
+ streamer.addData(1, 1);
+
+ node.close();
+
+ streamer.addData(1, 1);
+ }
+
+ streamer.flush();
+ streamer.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerEventsCauseRemaps() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+ ((DataStreamerImpl)streamer).maxRemapCount(0);
+
+ streamer.addData(1, 1);
+
+ startGrid(2);
+
+ try {
+ streamer.addData(1, 1);
+
+ streamer.flush();
+ }
+ catch (IllegalStateException ex) {
+ assert ex.getMessage().contains("Data streamer has been closed");
+
+ return;
+ }
+
+ fail("Expected exception wasn't thrown");
+ }
+
+ /**
* Gets cache configuration.
*
* @return Cache configuration.