You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/14 10:55:40 UTC

ignite git commit: wip on data streamer

Repository: ignite
Updated Branches:
  refs/heads/ignite-5658 [created] 6fc5ad90f


wip on data streamer


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6fc5ad90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6fc5ad90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6fc5ad90

Branch: refs/heads/ignite-5658
Commit: 6fc5ad90fa5af84e1f30d3efd669d2fade10f83d
Parents: 7338445
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Jul 14 13:55:13 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 14 13:55:13 2017 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 236 +++++++++++++------
 .../DataStreamProcessorSelfTest.java            |  93 ++++----
 2 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6fc5ad90/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df51fac..c25a5ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1310,12 +1310,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** Active futures. */
         private final Collection<IgniteInternalFuture<Object>> locFuts;
 
+//        /** Buffered entries. */
+//        private List<DataStreamerEntry> entries;
         /** Buffered entries. */
-        private List<DataStreamerEntry> entries;
+        private final ConcurrentMap<Integer, PerPartitionBuffer> entriesMap =
+            new ConcurrentHashMap8<>();
 
-        /** */
-        @GridToStringExclude
-        private GridFutureAdapter<Object> curFut;
+//        /** */
+//        @GridToStringExclude
+//        private GridFutureAdapter<Object> curFut;
 
         /** Local node flag. */
         private final boolean isLocNode;
@@ -1329,8 +1332,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** */
         private final Semaphore sem;
 
-        /** Batch topology. */
-        private AffinityTopologyVersion batchTopVer;
+//        /** Batch topology. */
+//        private AffinityTopologyVersion batchTopVer;
 
         /** Closure to signal on task finish. */
         @GridToStringExclude
@@ -1354,25 +1357,25 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             // Cache local node flag.
             isLocNode = node.equals(ctx.discovery().localNode());
 
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>();
-            curFut.listen(signalC);
+//            entries = newEntries();
+//            curFut = new GridFutureAdapter<>();
+//            curFut.listen(signalC);
 
             sem = new Semaphore(parallelOps);
         }
 
-        /**
-         * @param remap Remapping flag.
-         */
-        private void renewBatch(boolean remap) {
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>();
-
-            batchTopVer = null;
-
-            if (!remap)
-                curFut.listen(signalC);
-        }
+//        /**
+//         * @param remap Remapping flag.
+//         */
+//        private void renewBatch(boolean remap) {
+//            entries = newEntries();
+//            curFut = new GridFutureAdapter<>();
+//
+//            batchTopVer = null;
+//
+//            if (!remap)
+//                curFut.listen(signalC);
+//        }
 
         /**
          * @param newEntries Infos.
@@ -1382,51 +1385,69 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
          * @return Future for operation.
          * @throws IgniteInterruptedCheckedException If failed.
          */
-        @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+        @Nullable GridFutureAdapter<?> update(
+            Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
             IgniteInClosure<IgniteInternalFuture<?>> lsnr,
-            boolean remap) throws IgniteInterruptedCheckedException {
-            List<DataStreamerEntry> entries0 = null;
+            boolean remap
+        ) throws IgniteInterruptedCheckedException {
+            GridFutureAdapter<Object> curFut0 = null;
 
-            GridFutureAdapter<Object> curFut0;
+            for (DataStreamerEntry entry : newEntries) {
+                List<DataStreamerEntry> entries0 = null;
+                AffinityTopologyVersion curBatchTopVer;
+
+                // Init buffer.
+                int part = entry.getKey().partition();
+
+                PerPartitionBuffer b = entriesMap.get(part);
 
-            AffinityTopologyVersion curBatchTopVer;
+                if (b == null) {
+                    PerPartitionBuffer old =
+                        entriesMap.putIfAbsent(part, b = new PerPartitionBuffer(part, signalC));
+
+                    if (old != null)
+                        b = old;
+                }
 
-            synchronized (this) {
-                curFut0 = curFut;
+                synchronized (b) {
+                    curFut0 = b.curFut;
 
-                curFut0.listen(lsnr);
+                    // TODO: think over proper listener model:
+                    // Listener should be added only once per whole entries collection.
+                    // Should we simplify the model and get rid of all futures?
+                    curFut0.listen(lsnr);
 
-                if (batchTopVer == null)
-                    batchTopVer = topVer;
+                    if (b.batchTopVer == null)
+                        b.batchTopVer = topVer;
 
-                curBatchTopVer = batchTopVer;
+                    curBatchTopVer = b.batchTopVer;
 
-                for (DataStreamerEntry entry : newEntries)
-                    entries.add(entry);
+                    b.entries.add(entry);
 
-                if (entries.size() >= bufSize) {
-                    entries0 = entries;
+                    if (b.entries.size() >= bufSize) {
+                        entries0 = b.entries;
 
-                    renewBatch(remap);
+                        b.renewBatch(remap);
+                    }
                 }
-            }
 
-            if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
-                renewBatch(remap);
+                if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+                    b.renewBatch(remap);
 
-                curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
-                    "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
-            }
-            else if (entries0 != null) {
-                submit(entries0, curBatchTopVer, curFut0, remap);
-
-                if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
-                        DataStreamerImpl.this));
-                else if (ctx.clientDisconnected())
-                    curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Client node disconnected."));
+                    curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
+                        "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
+                }
+                else if (entries0 != null) {
+                    submit(entries0, curBatchTopVer, curFut0, remap);
+
+                    if (cancelled)
+                        curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                            DataStreamerImpl.this));
+                    else if (ctx.clientDisconnected())
+                        curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                            "Client node disconnected."));
+                }
             }
 
             return curFut0;
@@ -1449,19 +1470,22 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             acquireRemapSemaphore();
 
-            synchronized (this) {
-                if (!entries.isEmpty()) {
-                    entries0 = entries;
-                    curFut0 = curFut;
+            for (PerPartitionBuffer b : entriesMap.values()) {
+                AffinityTopologyVersion batchTopVer = null;
 
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>();
-                    curFut.listen(signalC);
+                synchronized (b) {
+                    if (!b.entries.isEmpty()) {
+                        entries0 = b.entries;
+                        curFut0 = b.curFut;
+                        batchTopVer = b.batchTopVer;
+
+                        b.renewBatch(false);
+                    }
                 }
-            }
 
-            if (entries0 != null)
-                submit(entries0, batchTopVer, curFut0, false);
+                if (entries0 != null)
+                    submit(entries0, batchTopVer, curFut0, false);
+            }
 
             // Create compound future for this flush.
             GridCompoundFuture<Object, Object> res = null;
@@ -1492,8 +1516,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
-            if (timeout == DFLT_UNLIMIT_TIMEOUT)
+            if (timeout == DFLT_UNLIMIT_TIMEOUT) {
+                if (sem.availablePermits() <= 0)
+                    U.debug(log, "No Permits node=" + node.order() + " " + node.isLocal() + " " + isLocNode);
                 U.acquire(sem);
+            }
             else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to add parallel operation.");
@@ -1508,7 +1535,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         private void signalTaskFinished(IgniteInternalFuture<Object> f) {
             assert f != null;
 
+
             sem.release();
+            U.debug(log, "Released: " + sem.availablePermits() + " node=" + node.order());
         }
 
         /**
@@ -1603,10 +1632,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
          * @param remap Remapping flag.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void submit(final Collection<DataStreamerEntry> entries,
+        private void submit(
+            final Collection<DataStreamerEntry> entries,
             @Nullable AffinityTopologyVersion topVer,
             final GridFutureAdapter<Object> curFut,
-            boolean remap)
+            boolean remap
+        )
             throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
@@ -1750,11 +1781,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             // Make sure to complete current future.
             GridFutureAdapter<Object> curFut0;
 
-            synchronized (this) {
-                curFut0 = curFut;
-            }
+            for (PerPartitionBuffer b : entriesMap.values()) {
+                synchronized (b) {
+                    curFut0 = b.curFut;
+                }
 
-            curFut0.onDone(e);
+                curFut0.onDone(e);
+            }
         }
 
         /**
@@ -1828,11 +1861,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            int size;
+            int size = -1;
 
-            synchronized (this) {
-                size = entries.size();
-            }
+            // TODO
+//            synchronized (this) {
+//                size = entries.size();
+//            }
 
             return S.toString(Buffer.class, this,
                 "entriesCnt", size,
@@ -2080,4 +2114,60 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             return S.toString(KeyCacheObjectWrapper.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class PerPartitionBuffer {
+        /** */
+        private final int partId;
+
+        /** */
+        private List<DataStreamerEntry> entries;
+
+        /** */
+        private GridFutureAdapter<Object> curFut;
+
+        /** Batch topology. */
+        private AffinityTopologyVersion batchTopVer;
+
+        /** */
+        private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC;
+
+        /**
+         * @param partId Partition ID.
+         * @param c Signal closure.
+         */
+        public PerPartitionBuffer(
+            int partId,
+            IgniteInClosure<? super IgniteInternalFuture<Object>> c
+        ) {
+            this.partId = partId;
+            signalC = c;
+
+            renewBatch(false);
+        }
+
+        synchronized void renewBatch(boolean remap) {
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>();
+
+            batchTopVer = null;
+
+            if (!remap)
+                curFut.listen(signalC);
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<DataStreamerEntry> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PerPartitionBuffer.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6fc5ad90/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..e33d811 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -473,7 +473,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
             final AtomicBoolean done = new AtomicBoolean();
 
             try {
-                final int totalPutCnt = 50000;
+                final int totalPutCnt = 10000;
 
                 IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
                     @Override public Object call() throws Exception {
@@ -491,60 +491,67 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
                             futs.add(ldr.addData(idx, idx));
                         }
 
-                        ldr.flush();
+//                        ldr.flush();
 
-                        for (IgniteFuture<?> fut : futs)
-                            fut.get();
+//                        for (IgniteFuture<?> fut : futs) {
+//                            info("Before: " + fut);
+//
+//                            fut.get();
+//
+//                            info("After.");
+//                        }
 
                         return null;
                     }
                 }, 5, "producer");
 
-                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!done.get()) {
-                            ldr.flush();
-
-                            U.sleep(100);
-                        }
-
-                        return null;
-                    }
-                }, 1, "flusher");
+//                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+//                    @Override public Object call() throws Exception {
+//                        while (!done.get()) {
+//                            ldr.flush();
+//
+//                            U.sleep(100);
+//                        }
+//
+//                        return null;
+//                    }
+//                }, 1, "flusher");
 
                 // Define index of node being restarted.
                 final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
 
-                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            for (int i = 0; i < 5; i++) {
-                                Ignite g = startGrid(restartNodeIdx);
-
-                                UUID id = g.cluster().localNode().id();
-
-                                info(">>>>>>> Started node: " + id);
-
-                                U.sleep(1000);
-
-                                stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
-
-                                info(">>>>>>> Stopped node: " + id);
-                            }
-                        }
-                        finally {
-                            done.set(true);
-
-                            info("Start stop thread finished.");
-                        }
-
-                        return null;
-                    }
-                }, 1, "start-stop-thread");
+                //TODO Uncomment
+
+//                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
+//                    @Override public Object call() throws Exception {
+//                        try {
+//                            for (int i = 0; i < 5; i++) {
+//                                Ignite g = startGrid(restartNodeIdx);
+//
+//                                UUID id = g.cluster().localNode().id();
+//
+//                                info(">>>>>>> Started node: " + id);
+//
+//                                U.sleep(1000);
+//
+//                                stopGrid(getTestIgniteInstanceName(restartNodeIdx), true);
+//
+//                                info(">>>>>>> Stopped node: " + id);
+//                            }
+//                        }
+//                        finally {
+//                            done.set(true);
+//
+//                            info("Start stop thread finished.");
+//                        }
+//
+//                        return null;
+//                    }
+//                }, 1, "start-stop-thread");
 
                 fut1.get();
-                fut2.get();
-                fut3.get();
+//                fut2.get();
+//                fut3.get();
             }
             finally {
                 ldr.close(false);