You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/28 12:08:02 UTC
[50/50] [abbrv] ignite git commit: ignite-5658 moved to stripes +
cleanup
ignite-5658 moved to stripes + cleanup
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/55f51b54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/55f51b54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/55f51b54
Branch: refs/heads/ignite-5658
Commit: 55f51b54f2ff4a256dda2709d05b6f56a17e8a41
Parents: d849764
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Jul 28 15:06:56 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Jul 28 15:06:56 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteDataStreamer.java | 14 +++-
.../ignite/internal/jdbc2/JdbcConnection.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 82 ++++++++------------
.../DataStreamProcessorSelfTest.java | 17 ++--
4 files changed, 49 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index e2473dc..0e84e36 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -72,7 +72,7 @@ import org.jetbrains.annotations.Nullable;
* this setting limits maximum allowed number of parallel buffered stream messages that
* are being processed on remote nodes. If this number is exceeded, then
* {@link #addData(Object, Object)} method will block to control memory utilization.
- * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ * Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}.
* </li>
* <li>
* {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
@@ -100,8 +100,8 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
public interface IgniteDataStreamer<K, V> extends AutoCloseable {
- /** Default max concurrent put operations count. */
- public static final int DFLT_MAX_PARALLEL_OPS = 16;
+ /** Default concurrent put operations multiplier for CPU count. */
+ public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 20;
/** Default per node buffer size. */
public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
@@ -193,6 +193,10 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
/**
* Gets maximum number of parallel stream operations for a single node.
+ * <p>
+ * If not provided (is equal to {@code 0}), then default value is equal to CPU count
+ * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}
+ * or equal provided value if this property is set.
*
* @return Maximum number of parallel stream operations for a single node.
*/
@@ -203,7 +207,9 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
* <p>
* This method should be called prior to {@link #addData(Object, Object)} call.
* <p>
- * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+ * If not provided (is equal to {@code 0}), then default value is equal to CPU count
+ * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}
+ * or equal provided value if this property is set.
*
* @param parallelOps Maximum number of parallel stream operations for a single node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 1bf51f2..eb443c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -186,7 +186,7 @@ public class JdbcConnection implements Connection {
streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
- String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+ String.valueOf(IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER)));
String nodeIdProp = props.getProperty(PROP_NODE_ID);
http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 5d1b0a3..6e9fbb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.datastreamer;
+import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -153,7 +155,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
/** */
- private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+ private int parallelOps;
/** */
private long timeout = DFLT_UNLIMIT_TIMEOUT;
@@ -828,6 +830,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (old != null)
buf = old;
+ else if (log.isInfoEnabled())
+ log.info("Created buffer for node [nodeId=" + nodeId +
+ ", parallelOps=" + buf.perNodeParallelOps +
+ ", stripes=" + buf.stripes.length + ']');
}
final Collection<DataStreamerEntry> entriesForNode = e.getValue();
@@ -1311,15 +1317,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Active futures. */
private final Collection<IgniteInternalFuture<Object>> locFuts;
-// /** Buffered entries. */
-// private List<DataStreamerEntry> entries;
/** Buffered entries. */
- private final ConcurrentMap<Integer, PerPartitionBuffer> entriesMap =
- new ConcurrentHashMap8<>();
-
-// /** */
-// @GridToStringExclude
-// private GridFutureAdapter<Object> curFut;
+ private final PerStripeBuffer[] stripes;
/** Local node flag. */
private final boolean isLocNode;
@@ -1333,16 +1332,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private final Semaphore sem;
-// /** Batch topology. */
-// private AffinityTopologyVersion batchTopVer;
+ /** */
+ private final int perNodeParallelOps;
/** Closure to signal on task finish. */
@GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- signalTaskFinished(t);
- }
- };
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
+ new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
/**
* @param node Node.
@@ -1358,25 +1358,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Cache local node flag.
isLocNode = node.equals(ctx.discovery().localNode());
-// entries = newEntries();
-// curFut = new GridFutureAdapter<>();
-// curFut.listen(signalC);
+ perNodeParallelOps = parallelOps != 0 ?
+ parallelOps :
+ node.metrics().getTotalCpus() * DFLT_PARALLEL_OPS_MULTIPLIER;
- sem = new Semaphore(parallelOps);
- }
+ sem = new Semaphore(perNodeParallelOps);
-// /**
-// * @param remap Remapping flag.
-// */
-// private void renewBatch(boolean remap) {
-// entries = newEntries();
-// curFut = new GridFutureAdapter<>();
-//
-// batchTopVer = null;
-//
-// if (!remap)
-// curFut.listen(signalC);
-// }
+ stripes = (PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, node.metrics().getTotalCpus());
+
+ for (int i = 0; i < stripes.length; i++)
+ stripes[i] = new PerStripeBuffer(i, signalC);
+ }
/**
* @param newEntries Infos.
@@ -1401,15 +1393,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Init buffer.
int part = entry.getKey().partition();
- PerPartitionBuffer b = entriesMap.get(part);
-
- if (b == null) {
- PerPartitionBuffer old =
- entriesMap.putIfAbsent(part, b = new PerPartitionBuffer(part, signalC));
-
- if (old != null)
- b = old;
- }
+ PerStripeBuffer b = stripes[part % stripes.length];
synchronized (b) {
curFut0 = b.curFut;
@@ -1468,7 +1452,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
acquireRemapSemaphore();
- for (PerPartitionBuffer b : entriesMap.values()) {
+ for (PerStripeBuffer b : stripes) {
AffinityTopologyVersion batchTopVer = null;
List<DataStreamerEntry> entries0 = null;
GridFutureAdapter<Object> curFut0 = null;
@@ -1729,11 +1713,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
dep != null ? dep.classLoaderId() : null,
dep == null,
topVer,
- rcvr == ISOLATED_UPDATER ? partId : -1);
+ rcvr == ISOLATED_UPDATER ? partId : GridIoMessage.STRIPE_DISABLED_PART);
try {
ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req,
- partId == -1 ? plc : GridIoPolicy.SYSTEM_POOL);
+ req.partition() == GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL);
if (log.isDebugEnabled())
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
@@ -1779,7 +1763,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Make sure to complete current future.
GridFutureAdapter<Object> curFut0;
- for (PerPartitionBuffer b : entriesMap.values()) {
+ for (PerStripeBuffer b : stripes) {
synchronized (b) {
curFut0 = b.curFut;
}
@@ -2118,7 +2102,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
*
*/
- private class PerPartitionBuffer {
+ private class PerStripeBuffer {
/** */
private final int partId;
@@ -2138,7 +2122,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param partId Partition ID.
* @param c Signal closure.
*/
- public PerPartitionBuffer(
+ public PerStripeBuffer(
int partId,
IgniteInClosure<? super IgniteInternalFuture<Object>> c
) {
@@ -2167,7 +2151,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(PerPartitionBuffer.class, this, super.toString());
+ return S.toString(PerStripeBuffer.class, this, super.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..b8ca255 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
- IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
- try {
+ try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
ldr.receiver(new StreamReceiver<String, String>() {
@Override public void receive(IgniteCache<String, String> cache,
Collection<Map.Entry<String, String>> entries) throws IgniteException {
@@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
cache.put("key", threadName);
}
});
+
ldr.addData("key", "value");
ldr.tryFlush();
@@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
}, 3_000);
}
- finally {
- ldr.close(true);
- }
assertNotNull(cache.get("key"));
@@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
- IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME);
-
- try {
+ try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) {
ldr.receiver(new StringStringStreamReceiver());
ldr.addData("key", "value");
@@ -1021,14 +1016,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
ldr.tryFlush();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
+ @Override
+ public boolean apply() {
return cache.get("key") != null;
}
}, 3_000);
}
- finally {
- ldr.close(true);
- }
assertNotNull(cache.get("key"));