You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/28 12:08:02 UTC

[50/50] [abbrv] ignite git commit: ignite-5658 moved to stripes + cleanup

ignite-5658 moved to stripes + cleanup


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/55f51b54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/55f51b54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/55f51b54

Branch: refs/heads/ignite-5658
Commit: 55f51b54f2ff4a256dda2709d05b6f56a17e8a41
Parents: d849764
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Jul 28 15:06:56 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 28 15:06:56 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteDataStreamer.java   | 14 +++-
 .../ignite/internal/jdbc2/JdbcConnection.java   |  2 +-
 .../datastreamer/DataStreamerImpl.java          | 82 ++++++++------------
 .../DataStreamProcessorSelfTest.java            | 17 ++--
 4 files changed, 49 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index e2473dc..0e84e36 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -72,7 +72,7 @@ import org.jetbrains.annotations.Nullable;
  *      this setting limits maximum allowed number of parallel buffered stream messages that
  *      are being processed on remote nodes. If this number is exceeded, then
  *      {@link #addData(Object, Object)} method will block to control memory utilization.
- *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ *      Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}.
  *  </li>
  *  <li>
  *      {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
@@ -100,8 +100,8 @@ import org.jetbrains.annotations.Nullable;
  * </ul>
  */
 public interface IgniteDataStreamer<K, V> extends AutoCloseable {
-    /** Default max concurrent put operations count. */
-    public static final int DFLT_MAX_PARALLEL_OPS = 16;
+    /** Default concurrent put operations multiplier for CPU count. */
+    public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 20;
 
     /** Default per node buffer size. */
     public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
@@ -193,6 +193,10 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
 
     /**
      * Gets maximum number of parallel stream operations for a single node.
+     * <p>
+     * If not provided (is equal to {@code 0}), then default value is equal to CPU count
+     * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}
+     * or equal provided value if this property is set.
      *
      * @return Maximum number of parallel stream operations for a single node.
      */
@@ -203,7 +207,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * <p>
      * This method should be called prior to {@link #addData(Object, Object)} call.
      * <p>
-     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+     * If not provided (is equal to {@code 0}), then default value is equal to CPU count
+     * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}
+     * or equal provided value if this property is set.
      *
      * @param parallelOps Maximum number of parallel stream operations for a single node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 1bf51f2..eb443c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -186,7 +186,7 @@ public class JdbcConnection implements Connection {
         streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
             String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
         streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
-            String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+            String.valueOf(IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER)));
 
         String nodeIdProp = props.getProperty(PROP_NODE_ID);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 5d1b0a3..6e9fbb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
+import java.lang.reflect.Array;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -153,7 +155,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
 
     /** */
-    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+    private int parallelOps;
 
     /** */
     private long timeout = DFLT_UNLIMIT_TIMEOUT;
@@ -828,6 +830,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                         if (old != null)
                             buf = old;
+                        else if (log.isInfoEnabled())
+                            log.info("Created buffer for node [nodeId=" + nodeId +
+                                ", parallelOps=" + buf.perNodeParallelOps +
+                                ", stripes=" + buf.stripes.length + ']');
                     }
 
                     final Collection<DataStreamerEntry> entriesForNode = e.getValue();
@@ -1311,15 +1317,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** Active futures. */
         private final Collection<IgniteInternalFuture<Object>> locFuts;
 
-//        /** Buffered entries. */
-//        private List<DataStreamerEntry> entries;
         /** Buffered entries. */
-        private final ConcurrentMap<Integer, PerPartitionBuffer> entriesMap =
-            new ConcurrentHashMap8<>();
-
-//        /** */
-//        @GridToStringExclude
-//        private GridFutureAdapter<Object> curFut;
+        private final PerStripeBuffer[] stripes;
 
         /** Local node flag. */
         private final boolean isLocNode;
@@ -1333,16 +1332,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** */
         private final Semaphore sem;
 
-//        /** Batch topology. */
-//        private AffinityTopologyVersion batchTopVer;
+        /** */
+        private final int perNodeParallelOps;
 
         /** Closure to signal on task finish. */
         @GridToStringExclude
-        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
-            @Override public void apply(IgniteInternalFuture<Object> t) {
-                signalTaskFinished(t);
-            }
-        };
+        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
+            new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                @Override public void apply(IgniteInternalFuture<Object> t) {
+                    signalTaskFinished(t);
+                }
+            };
 
         /**
          * @param node Node.
@@ -1358,25 +1358,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             // Cache local node flag.
             isLocNode = node.equals(ctx.discovery().localNode());
 
-//            entries = newEntries();
-//            curFut = new GridFutureAdapter<>();
-//            curFut.listen(signalC);
+            perNodeParallelOps = parallelOps != 0 ?
+                parallelOps :
+                node.metrics().getTotalCpus() * DFLT_PARALLEL_OPS_MULTIPLIER;
 
-            sem = new Semaphore(parallelOps);
-        }
+            sem = new Semaphore(perNodeParallelOps);
 
-//        /**
-//         * @param remap Remapping flag.
-//         */
-//        private void renewBatch(boolean remap) {
-//            entries = newEntries();
-//            curFut = new GridFutureAdapter<>();
-//
-//            batchTopVer = null;
-//
-//            if (!remap)
-//                curFut.listen(signalC);
-//        }
+            stripes = (PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, node.metrics().getTotalCpus());
+
+            for (int i = 0; i < stripes.length; i++)
+                stripes[i] = new PerStripeBuffer(i, signalC);
+        }
 
         /**
          * @param newEntries Infos.
@@ -1401,15 +1393,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 // Init buffer.
                 int part = entry.getKey().partition();
 
-                PerPartitionBuffer b = entriesMap.get(part);
-
-                if (b == null) {
-                    PerPartitionBuffer old =
-                        entriesMap.putIfAbsent(part, b = new PerPartitionBuffer(part, signalC));
-
-                    if (old != null)
-                        b = old;
-                }
+                PerStripeBuffer b = stripes[part % stripes.length];
 
                 synchronized (b) {
                     curFut0 = b.curFut;
@@ -1468,7 +1452,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
             acquireRemapSemaphore();
 
-            for (PerPartitionBuffer b : entriesMap.values()) {
+            for (PerStripeBuffer b : stripes) {
                 AffinityTopologyVersion batchTopVer = null;
                 List<DataStreamerEntry> entries0 = null;
                 GridFutureAdapter<Object> curFut0 = null;
@@ -1729,11 +1713,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
                     topVer,
-                    rcvr == ISOLATED_UPDATER ? partId : -1);
+                    rcvr == ISOLATED_UPDATER ? partId : GridIoMessage.STRIPE_DISABLED_PART);
 
                 try {
                     ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req,
-                        partId == -1 ? plc : GridIoPolicy.SYSTEM_POOL);
+                        req.partition() == GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
@@ -1779,7 +1763,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             // Make sure to complete current future.
             GridFutureAdapter<Object> curFut0;
 
-            for (PerPartitionBuffer b : entriesMap.values()) {
+            for (PerStripeBuffer b : stripes) {
                 synchronized (b) {
                     curFut0 = b.curFut;
                 }
@@ -2118,7 +2102,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /**
      *
      */
-    private class PerPartitionBuffer {
+    private class PerStripeBuffer {
         /** */
         private final int partId;
 
@@ -2138,7 +2122,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
          * @param partId Partition ID.
          * @param c Signal closure.
          */
-        public PerPartitionBuffer(
+        public PerStripeBuffer(
             int partId,
             IgniteInClosure<? super IgniteInternalFuture<Object>> c
         ) {
@@ -2167,7 +2151,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(PerPartitionBuffer.class, this, super.toString());
+            return S.toString(PerStripeBuffer.class, this, super.toString());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..b8ca255 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-            IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
-            try {
+            try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
                 ldr.receiver(new StreamReceiver<String, String>() {
                     @Override public void receive(IgniteCache<String, String> cache,
                         Collection<Map.Entry<String, String>> entries) throws IgniteException {
@@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
                         cache.put("key", threadName);
                     }
                 });
+
                 ldr.addData("key", "value");
 
                 ldr.tryFlush();
@@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
                     }
                 }, 3_000);
             }
-            finally {
-                ldr.close(true);
-            }
 
             assertNotNull(cache.get("key"));
 
@@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-            IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME);
-
-            try {
+            try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) {
                 ldr.receiver(new StringStringStreamReceiver());
 
                 ldr.addData("key", "value");
@@ -1021,14 +1016,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
                 ldr.tryFlush();
 
                 GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                    @Override public boolean apply() {
+                    @Override
+                    public boolean apply() {
                         return cache.get("key") != null;
                     }
                 }, 3_000);
             }
-            finally {
-                ldr.close(true);
-            }
 
             assertNotNull(cache.get("key"));