You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:43:22 UTC
[37/50] [abbrv] ignite git commit: IGNITE-6051 Improve future
listeners model in DataStreamerImpl
IGNITE-6051 Improve future listeners model in DataStreamerImpl
(cherry picked from commit 18ca0b2)
(cherry picked from commit cec55c3)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1faa8dbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1faa8dbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1faa8dbe
Branch: refs/heads/ignite-2.1.5-p1
Commit: 1faa8dbed4009b691ee0fd5920aead8ec3e6719c
Parents: 71cd1e9
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Aug 21 17:28:25 2017 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Fri Sep 22 15:28:40 2017 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 63 ++++++--
.../util/future/GridCompoundFuture.java | 40 ++++--
.../datastreamer/DataStreamerImplSelfTest.java | 143 ++++++++++++++++++-
3 files changed, 221 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 1869dcf..6ed552a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -941,10 +941,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
};
+ GridCompoundFuture opFut = new SilentCompoundFuture();
+
+ opFut.listen(lsnr);
+
final List<GridFutureAdapter<?>> futs;
try {
- futs = buf.update(entriesForNode, topVer, lsnr, remap);
+ futs = buf.update(entriesForNode, topVer, opFut, remap);
+
+ opFut.markInitialized();
}
catch (IgniteInterruptedCheckedException e1) {
resFut.onDone(e1);
@@ -1382,7 +1388,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @param newEntries Infos.
* @param topVer Topology version.
- * @param lsnr Listener for the operation future.
+ * @param opFut Completion future for the operation.
* @param remap Remapping flag.
* @return Future for operation.
* @throws IgniteInterruptedCheckedException If failed.
@@ -1390,10 +1396,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Nullable List<GridFutureAdapter<?>> update(
Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
- IgniteInClosure<IgniteInternalFuture<?>> lsnr,
+ GridCompoundFuture opFut,
boolean remap
) throws IgniteInterruptedCheckedException {
List<GridFutureAdapter<?>> res = null;
+ GridFutureAdapter[] futs = new GridFutureAdapter[stripes.length];
for (DataStreamerEntry entry : newEntries) {
List<DataStreamerEntry> entries0 = null;
@@ -1408,9 +1415,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
synchronized (b) {
curFut0 = b.curFut;
- // Listener should be added only once per whole entries collection.
- // Should we simplify the model and get rid of all futures?
- curFut0.listen(lsnr);
+ if (futs[b.partId] != curFut0) {
+ opFut.add(curFut0);
+
+ if (res == null)
+ res = new ArrayList<>();
+
+ res.add(curFut0);
+
+ futs[b.partId] = curFut0;
+ }
if (b.batchTopVer == null)
b.batchTopVer = topVer;
@@ -1426,14 +1440,28 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
}
- if (res == null)
- res = new ArrayList<>();
+ if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+ for (int i = 0; i < stripes.length; i++) {
+ PerStripeBuffer b0 = stripes[i];
- res.add(curFut0);
+ // renew all stale versions
+ synchronized (b0) {
+ // Another thread might already renew the batch
+ AffinityTopologyVersion bTopVer = b0.batchTopVer;
- if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
- b.renewBatch(remap);
+ if(bTopVer != null && topVer.compareTo(bTopVer) > 0) {
+ GridFutureAdapter<Object> bFut = b0.curFut;
+
+ b0.renewBatch(remap);
+ bFut.onDone(null,
+ new IgniteCheckedException("Topology changed during batch preparation " +
+ "[batchTopVer=" + bTopVer + ", topVer=" + topVer + "]"));
+ }
+ }
+ }
+
+ // double check, it's possible that current future was already overwritten on buffer overflow
curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
"[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
}
@@ -2182,4 +2210,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
return S.toString(PerStripeBuffer.class, this, super.toString());
}
}
+
+ /** */
+ private static final class SilentCompoundFuture<T,R> extends GridCompoundFuture<T,R> {
+ /** {@inheritDoc} */
+ @Override protected void logError(IgniteLogger log, String msg, Throwable e) {
+ // no-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void logDebug(IgniteLogger log, String msg) {
+ // no-op
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 74a8f41..80cf67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -96,13 +96,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
onDone(rdc.reduce());
}
catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
// Exception in reducer is a bug, so we bypass checkComplete here.
onDone(e);
}
catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
// Bypass checkComplete because need to rethrow.
onDone(e);
@@ -117,25 +117,21 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
}
catch (IgniteCheckedException e) {
if (!ignoreFailure(e)) {
- if (e instanceof NodeStoppingException) {
- IgniteLogger log = logger();
-
- if (log != null && log.isDebugEnabled())
- log.debug("Failed to execute compound future reducer, node stopped.");
- }
+ if (e instanceof NodeStoppingException)
+ logDebug(logger(), "Failed to execute compound future reducer, node stopped.");
else
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
}
}
catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
}
catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
// Bypass checkComplete because need to rethrow.
onDone(e);
@@ -278,12 +274,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
onDone(rdc != null ? rdc.reduce() : null);
}
catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
}
catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ logError(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
@@ -293,6 +289,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
}
/**
+ * @param log IgniteLogger.
+ * @param msg ShortMessage.
+ * @param e Exception.
+ */
+ protected void logError(IgniteLogger log, String msg, Throwable e) {
+ U.error(log, msg, e);
+ }
+
+ /**
+ * @param log IgniteLogger.
+ * @param msg ShortMessage.
+ */
+ protected void logDebug(IgniteLogger log, String msg) {
+ if (log != null && log.isDebugEnabled())
+ log.debug(msg);
+ }
+
+ /**
* Returns future at the specified position in this list.
*
* @param idx - index index of the element to return
http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 6d3466b..e90f6b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -18,10 +18,17 @@
package org.apache.ignite.internal.processors.datastreamer;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -31,11 +38,13 @@ import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -45,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionException;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
@@ -105,6 +115,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+ cnt = 0;
+
startGrids(5);
final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -252,6 +264,135 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAllOperationFinishedBeforeFutureCompletion() throws Exception {
+ cnt = 0;
+
+ Ignite ignite = startGrids(MAX_CACHE_COUNT);
+
+ final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Throwable> ex = new AtomicReference<>();
+
+ Collection<Map.Entry> entries = new ArrayList<>(100);
+
+ for (int i = 0; i < 100; i++)
+ entries.add(new IgniteBiTuple<>(i, "" + i));
+
+ IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+ ldr.addData(entries).listen(new IgniteInClosure<IgniteFuture<?>>() {
+ @Override public void apply(IgniteFuture<?> future) {
+ try {
+ future.get();
+
+ for (int i = 0; i < 100; i++)
+ assertEquals("" + i, cache.get(i));
+ }
+ catch (Throwable e) {
+ ex.set(e);
+ }
+
+ latch.countDown();
+ }
+ });
+
+ ldr.tryFlush();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ Throwable e = ex.get();
+
+ if(e != null) {
+ if(e instanceof Error)
+ throw (Error) e;
+
+ if(e instanceof RuntimeException)
+ throw (RuntimeException) e;
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemapOnTopologyChangeDuringUpdatePreparation() throws Exception {
+ cnt = 0;
+
+ Ignite ignite = startGrids(MAX_CACHE_COUNT);
+
+ final int threads = 8;
+ final int entries = threads * 10000;
+ final long timeout = 10000;
+
+ final CountDownLatch l1 = new CountDownLatch(threads);
+ final CountDownLatch l2 = new CountDownLatch(1);
+ final AtomicInteger cntr = new AtomicInteger();
+
+ final AtomicReference<Throwable> ex = new AtomicReference<>();
+
+ final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+ final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ int i = cntr.getAndIncrement();
+
+ for (int j = 0; i < (entries >> 1); i += threads) {
+ ldr.addData(i, i);
+
+ if(j++ % 1000 == 0)
+ ldr.tryFlush();
+ }
+
+ l1.countDown();
+
+ assertTrue(l2.await(timeout, TimeUnit.MILLISECONDS));
+
+ for (int j = 0; i < entries; i += threads) {
+ ldr.addData(i, i);
+
+ if(j++ % 1000 == 0)
+ ldr.tryFlush();
+ }
+ }
+ catch (Throwable e) {
+ ex.compareAndSet(null, e);
+ }
+ }
+ }, threads, "loader");
+
+ assertTrue(l1.await(timeout, TimeUnit.MILLISECONDS));
+
+ stopGrid(MAX_CACHE_COUNT - 1);
+
+ l2.countDown();
+
+ fut.get(timeout);
+
+ ldr.close();
+
+ Throwable e = ex.get();
+
+ if(e != null) {
+ if(e instanceof Error)
+ throw (Error) e;
+
+ if(e instanceof RuntimeException)
+ throw (RuntimeException) e;
+
+ throw new RuntimeException(e);
+ }
+
+ IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ for(int i = 0; i < entries; i++)
+ assertEquals(i, cache.get(i));
+ }
+
+ /**
* Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and
* no error logged to the console.
*
@@ -366,4 +507,4 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
super.sendMessage(node, msg, ackC);
}
}
-}
\ No newline at end of file
+}