You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:43:22 UTC

[37/50] [abbrv] ignite git commit: IGNITE-6051 Improve future listeners model in DataStreamerImpl

IGNITE-6051 Improve future listeners model in DataStreamerImpl

(cherry picked from commit 18ca0b2)

(cherry picked from commit cec55c3)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1faa8dbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1faa8dbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1faa8dbe

Branch: refs/heads/ignite-2.1.5-p1
Commit: 1faa8dbed4009b691ee0fd5920aead8ec3e6719c
Parents: 71cd1e9
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Aug 21 17:28:25 2017 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Fri Sep 22 15:28:40 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          |  63 ++++++--
 .../util/future/GridCompoundFuture.java         |  40 ++++--
 .../datastreamer/DataStreamerImplSelfTest.java  | 143 ++++++++++++++++++-
 3 files changed, 221 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 1869dcf..6ed552a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -941,10 +941,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         }
                     };
 
+                    GridCompoundFuture opFut = new SilentCompoundFuture();
+
+                    opFut.listen(lsnr);
+
                     final List<GridFutureAdapter<?>> futs;
 
                     try {
-                        futs = buf.update(entriesForNode, topVer, lsnr, remap);
+                        futs = buf.update(entriesForNode, topVer, opFut, remap);
+
+                        opFut.markInitialized();
                     }
                     catch (IgniteInterruptedCheckedException e1) {
                         resFut.onDone(e1);
@@ -1382,7 +1388,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /**
          * @param newEntries Infos.
          * @param topVer Topology version.
-         * @param lsnr Listener for the operation future.
+         * @param opFut Completion future for the operation.
          * @param remap Remapping flag.
          * @return Future for operation.
          * @throws IgniteInterruptedCheckedException If failed.
@@ -1390,10 +1396,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         @Nullable List<GridFutureAdapter<?>> update(
             Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr,
+            GridCompoundFuture opFut,
             boolean remap
         ) throws IgniteInterruptedCheckedException {
             List<GridFutureAdapter<?>> res = null;
+            GridFutureAdapter[] futs = new GridFutureAdapter[stripes.length];
 
             for (DataStreamerEntry entry : newEntries) {
                 List<DataStreamerEntry> entries0 = null;
@@ -1408,9 +1415,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 synchronized (b) {
                     curFut0 = b.curFut;
 
-                    // Listener should be added only once per whole entries collection.
-                    // Should we simplify the model and get rid of all futures?
-                    curFut0.listen(lsnr);
+                    if (futs[b.partId] != curFut0) {
+                        opFut.add(curFut0);
+
+                        if (res == null)
+                            res = new ArrayList<>();
+
+                        res.add(curFut0);
+
+                        futs[b.partId] = curFut0;
+                    }
 
                     if (b.batchTopVer == null)
                         b.batchTopVer = topVer;
@@ -1426,14 +1440,28 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     }
                 }
 
-                if (res == null)
-                    res = new ArrayList<>();
+                if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+                    for (int i = 0; i < stripes.length; i++) {
+                        PerStripeBuffer b0 = stripes[i];
 
-                res.add(curFut0);
+                        // renew all stale versions
+                        synchronized (b0) {
+                            // Another thread might already renew the batch
+                            AffinityTopologyVersion bTopVer = b0.batchTopVer;
 
-                if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
-                    b.renewBatch(remap);
+                            if(bTopVer != null && topVer.compareTo(bTopVer) > 0) {
+                                GridFutureAdapter<Object> bFut = b0.curFut;
+
+                                b0.renewBatch(remap);
 
+                                bFut.onDone(null,
+                                    new IgniteCheckedException("Topology changed during batch preparation " +
+                                        "[batchTopVer=" + bTopVer + ", topVer=" + topVer + "]"));
+                            }
+                        }
+                    }
+
+                    // double check, it's possible that current future was already overwritten on buffer overflow
                     curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
                         "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
                 }
@@ -2182,4 +2210,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             return S.toString(PerStripeBuffer.class, this, super.toString());
         }
     }
+
+    /** */
+    private static final class SilentCompoundFuture<T,R> extends GridCompoundFuture<T,R> {
+       /** {@inheritDoc} */
+        @Override protected void logError(IgniteLogger log, String msg, Throwable e) {
+            // no-op
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void logDebug(IgniteLogger log, String msg) {
+            // no-op
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 74a8f41..80cf67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -96,13 +96,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
                     onDone(rdc.reduce());
             }
             catch (RuntimeException e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                logError(null, "Failed to execute compound future reducer: " + this, e);
 
                 // Exception in reducer is a bug, so we bypass checkComplete here.
                 onDone(e);
             }
             catch (AssertionError e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                logError(null, "Failed to execute compound future reducer: " + this, e);
 
                 // Bypass checkComplete because need to rethrow.
                 onDone(e);
@@ -117,25 +117,21 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
         }
         catch (IgniteCheckedException e) {
             if (!ignoreFailure(e)) {
-                if (e instanceof NodeStoppingException) {
-                    IgniteLogger log = logger();
-
-                    if (log != null && log.isDebugEnabled())
-                        log.debug("Failed to execute compound future reducer, node stopped.");
-                }
+                if (e instanceof NodeStoppingException)
+                    logDebug(logger(), "Failed to execute compound future reducer, node stopped.");
                 else
-                    U.error(null, "Failed to execute compound future reducer: " + this, e);
+                    logError(null, "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
             }
         }
         catch (RuntimeException e) {
-            U.error(null, "Failed to execute compound future reducer: " + this, e);
+            logError(null, "Failed to execute compound future reducer: " + this, e);
 
             onDone(e);
         }
         catch (AssertionError e) {
-            U.error(null, "Failed to execute compound future reducer: " + this, e);
+            logError(null, "Failed to execute compound future reducer: " + this, e);
 
             // Bypass checkComplete because need to rethrow.
             onDone(e);
@@ -278,12 +274,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
                 onDone(rdc != null ? rdc.reduce() : null);
             }
             catch (RuntimeException e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                logError(null, "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
             }
             catch (AssertionError e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                logError(null, "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
 
@@ -293,6 +289,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     }
 
     /**
+     * @param log IgniteLogger.
+     * @param msg ShortMessage.
+     * @param e Exception.
+     */
+    protected void logError(IgniteLogger log, String msg, Throwable e) {
+        U.error(log, msg, e);
+    }
+
+    /**
+     * @param log IgniteLogger.
+     * @param msg ShortMessage.
+     */
+    protected void logDebug(IgniteLogger log, String msg) {
+        if (log != null && log.isDebugEnabled())
+            log.debug(msg);
+    }
+
+    /**
      * Returns future at the specified position in this list.
      *
      * @param idx - index index of the element to return

http://git-wip-us.apache.org/repos/asf/ignite/blob/1faa8dbe/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 6d3466b..e90f6b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -18,10 +18,17 @@
 package org.apache.ignite.internal.processors.datastreamer;
 
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -31,11 +38,13 @@ import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -45,6 +54,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionException;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
@@ -105,6 +115,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+        cnt = 0;
+
         startGrids(5);
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -252,6 +264,135 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAllOperationFinishedBeforeFutureCompletion() throws Exception {
+        cnt = 0;
+
+        Ignite ignite = startGrids(MAX_CACHE_COUNT);
+
+        final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Throwable> ex = new AtomicReference<>();
+
+        Collection<Map.Entry> entries = new ArrayList<>(100);
+
+        for (int i = 0; i < 100; i++)
+            entries.add(new IgniteBiTuple<>(i, "" + i));
+
+        IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+        ldr.addData(entries).listen(new IgniteInClosure<IgniteFuture<?>>() {
+            @Override public void apply(IgniteFuture<?> future) {
+                try {
+                    future.get();
+
+                    for (int i = 0; i < 100; i++)
+                        assertEquals("" + i, cache.get(i));
+                }
+                catch (Throwable e) {
+                    ex.set(e);
+                }
+
+                latch.countDown();
+            }
+        });
+
+        ldr.tryFlush();
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        Throwable e = ex.get();
+
+        if(e != null) {
+            if(e instanceof Error)
+                throw (Error) e;
+
+            if(e instanceof RuntimeException)
+                throw (RuntimeException) e;
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemapOnTopologyChangeDuringUpdatePreparation() throws Exception {
+        cnt = 0;
+
+        Ignite ignite = startGrids(MAX_CACHE_COUNT);
+
+        final int threads = 8;
+        final int entries = threads * 10000;
+        final long timeout = 10000;
+
+        final CountDownLatch l1 = new CountDownLatch(threads);
+        final CountDownLatch l2 = new CountDownLatch(1);
+        final AtomicInteger cntr = new AtomicInteger();
+
+        final AtomicReference<Throwable> ex = new AtomicReference<>();
+
+        final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+        final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    int i = cntr.getAndIncrement();
+
+                    for (int j = 0; i < (entries >> 1); i += threads) {
+                        ldr.addData(i, i);
+
+                        if(j++ % 1000 == 0)
+                            ldr.tryFlush();
+                    }
+
+                    l1.countDown();
+
+                    assertTrue(l2.await(timeout, TimeUnit.MILLISECONDS));
+
+                    for (int j = 0; i < entries; i += threads) {
+                        ldr.addData(i, i);
+
+                        if(j++ % 1000 == 0)
+                            ldr.tryFlush();
+                    }
+                }
+                catch (Throwable e) {
+                    ex.compareAndSet(null, e);
+                }
+            }
+        }, threads, "loader");
+
+        assertTrue(l1.await(timeout, TimeUnit.MILLISECONDS));
+
+        stopGrid(MAX_CACHE_COUNT - 1);
+
+        l2.countDown();
+
+        fut.get(timeout);
+
+        ldr.close();
+
+        Throwable e = ex.get();
+
+        if(e != null) {
+            if(e instanceof Error)
+                throw (Error) e;
+
+            if(e instanceof RuntimeException)
+                throw (RuntimeException) e;
+
+            throw new RuntimeException(e);
+        }
+
+        IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for(int i = 0; i < entries; i++)
+            assertEquals(i, cache.get(i));
+    }
+
+    /**
      * Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and
      * no error logged to the console.
      *
@@ -366,4 +507,4 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
             super.sendMessage(node, msg, ackC);
         }
     }
-}
\ No newline at end of file
+}