You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:43:20 UTC
[35/50] [abbrv] ignite git commit: IGNITE-5658 Optimizations for data
streamer
IGNITE-5658 Optimizations for data streamer
(cherry picked from commit aa81dd1)
(cherry picked from commit 1ad4f14)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/212603e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/212603e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/212603e1
Branch: refs/heads/ignite-2.1.5-p1
Commit: 212603e1a969b9320f40207cd9233bed9152b3e4
Parents: 98afbfd
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Wed Aug 9 19:29:39 2017 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Fri Sep 22 15:28:11 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteDataStreamer.java | 26 +-
.../apache/ignite/IgniteSystemProperties.java | 12 +-
.../ignite/internal/GridKernalContext.java | 2 +-
.../ignite/internal/GridKernalContextImpl.java | 6 +-
.../apache/ignite/internal/IgniteKernal.java | 5 +-
.../ignite/internal/IgniteNodeAttributes.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 23 +-
.../ignite/internal/jdbc2/JdbcConnection.java | 5 +-
.../managers/communication/GridIoManager.java | 6 +
.../managers/communication/GridIoMessage.java | 3 +
.../cache/persistence/freelist/PagesList.java | 67 +++--
.../wal/reader/StandaloneGridKernalContext.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 281 ++++++++++++-------
.../datastreamer/DataStreamerRequest.java | 59 +++-
.../ignite/internal/util/StripedExecutor.java | 89 +++++-
.../org/apache/ignite/thread/IgniteThread.java | 10 +
.../DataStreamProcessorSelfTest.java | 14 +-
.../datastreamer/DataStreamerImplSelfTest.java | 3 +-
18 files changed, 424 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index e2473dc..b1f5851 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -20,6 +20,7 @@ package org.apache.ignite;
import java.util.Collection;
import java.util.Map;
import javax.cache.CacheException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.stream.StreamReceiver;
@@ -72,7 +73,7 @@ import org.jetbrains.annotations.Nullable;
* this setting limits maximum allowed number of parallel buffered stream messages that
* are being processed on remote nodes. If this number is exceeded, then
* {@link #addData(Object, Object)} method will block to control memory utilization.
- * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ * Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}.
* </li>
* <li>
* {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
@@ -100,11 +101,23 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
public interface IgniteDataStreamer<K, V> extends AutoCloseable {
- /** Default max concurrent put operations count. */
+ /**
+ * Default max concurrent put operations count.
+ * @deprecated Is not used anymore.
+ */
+ @Deprecated
public static final int DFLT_MAX_PARALLEL_OPS = 16;
- /** Default per node buffer size. */
- public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+ /**
+ * Default multiplier for data streamer pool size to get concurrent batches count for each remote node.
+ *
+ * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
+ * @see #perNodeParallelOperations()
+ */
+ public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 8;
+
+ /** Default operations batch size to sent to remote node for loading. */
+ public static final int DFLT_PER_NODE_BUFFER_SIZE = 512;
/** Default timeout for streamer's operations. */
public static final long DFLT_UNLIMIT_TIMEOUT = -1;
@@ -203,9 +216,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
* <p>
* This method should be called prior to {@link #addData(Object, Object)} call.
* <p>
- * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+ * If not provided, default value is calculated as follows
+ * {@link #DFLT_PARALLEL_OPS_MULTIPLIER} * {@code DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE}.
*
* @param parallelOps Maximum number of parallel stream operations for a single node.
+ * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
*/
public void perNodeParallelOperations(int parallelOps);
@@ -450,5 +465,4 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
*/
@Override public void close() throws CacheException, IgniteInterruptedException,
IgniteDataStreamerTimeoutException;
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index f627e24..ec79026 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Properties;
import javax.net.ssl.HostnameVerifier;
import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -707,9 +706,12 @@ public final class IgniteSystemProperties {
public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
/**
- * If this property is set, then Ignite will use Async File IO factory by default.
+ * Tasks stealing will be started if tasks queue size per data-streamer thread exceeds this threshold.
+ * <p>
+ * Default value is {@code 4}.
*/
- public static final String IGNITE_USE_ASYNC_FILE_IO_FACTORY = "IGNITE_USE_ASYNC_FILE_IO_FACTORY";
+ public static final String IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD =
+ "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD";
/**
* If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records
@@ -719,10 +721,6 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_WAL_LOG_TX_RECORDS = "IGNITE_WAL_LOG_TX_RECORDS";
- /** If this property is set, {@link PersistentStoreConfiguration#writeThrottlingEnabled} will be overridden to true
- * independent of initial value in configuration. */
- public static final String IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED = "IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED";
-
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 93ae465..99c7cce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -531,7 +531,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*
* @return Thread pool implementation to be used for data stream messages.
*/
- public ExecutorService getDataStreamerExecutorService();
+ public StripedExecutor getDataStreamerExecutorService();
/**
* Should return an instance of fully configured thread pool to be used for
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 0c80eae..07e5970 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -316,7 +316,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
- private ExecutorService dataStreamExecSvc;
+ private StripedExecutor dataStreamExecSvc;
/** */
@GridToStringExclude
@@ -422,7 +422,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
- ExecutorService dataStreamExecSvc,
+ StripedExecutor dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
@@ -975,7 +975,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public ExecutorService getDataStreamerExecutorService() {
+ @Override public StripedExecutor getDataStreamerExecutorService() {
return dataStreamExecSvc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index b7430da..3ed6447 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -213,6 +213,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -733,7 +734,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
- ExecutorService dataStreamExecSvc,
+ StripedExecutor dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
@@ -1457,6 +1458,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
*/
@SuppressWarnings({"SuspiciousMethodCalls", "unchecked", "TypeMayBeWeakened"})
private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedException {
+ ctx.addNodeAttribute(ATTR_DATA_STREAMER_POOL_SIZE, configuration().getDataStreamerThreadPoolSize());
+
final String[] incProps = cfg.getIncludeProperties();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index e4ed44a..024f339 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -180,6 +180,9 @@ public final class IgniteNodeAttributes {
/** Ignite security compatibility mode. */
public static final String ATTR_SECURITY_COMPATIBILITY_MODE = ATTR_PREFIX + ".security.compatibility.enabled";
+ /** */
+ public static final String ATTR_DATA_STREAMER_POOL_SIZE = ATTR_PREFIX + ".data.streamer.pool.size";
+
/** Memory configuration. */
public static final String ATTR_MEMORY_CONFIG = ATTR_PREFIX + ".memory";
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 23baeb3..07a5c43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1508,7 +1508,7 @@ public class IgnitionEx {
private ThreadPoolExecutor igfsExecSvc;
/** Data streamer executor service. */
- private ThreadPoolExecutor dataStreamerExecSvc;
+ private StripedExecutor dataStreamerExecSvc;
/** REST requests executor service. */
private ThreadPoolExecutor restExecSvc;
@@ -1728,7 +1728,11 @@ public class IgnitionEx {
validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
- stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
+ stripedExecSvc = new StripedExecutor(
+ cfg.getStripedPoolSize(),
+ cfg.getIgniteInstanceName(),
+ "sys",
+ log);
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
@@ -1763,17 +1767,12 @@ public class IgnitionEx {
p2pExecSvc.allowCoreThreadTimeOut(true);
- // Note that we do not pre-start threads here as this pool may not be needed.
- dataStreamerExecSvc = new IgniteThreadPoolExecutor(
- "data-streamer",
- cfg.getIgniteInstanceName(),
- cfg.getDataStreamerThreadPoolSize(),
+ dataStreamerExecSvc = new StripedExecutor(
cfg.getDataStreamerThreadPoolSize(),
- DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.DATA_STREAMER_POOL);
-
- dataStreamerExecSvc.allowCoreThreadTimeOut(true);
+ cfg.getIgniteInstanceName(),
+ "data-streamer",
+ log,
+ true);
// Note that we do not pre-start threads here as igfs pool may not be needed.
validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS");
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 6c8fc0b..6b8371d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -199,8 +199,9 @@ public class JdbcConnection implements Connection {
streamFlushTimeout = Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0"));
streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
- streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
- String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+ // If value is zero, server data-streamer pool size multiplied
+ // by IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER will be used
+ streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, "0"));
String nodeIdProp = props.getProperty(PROP_NODE_ID);
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bb36b26..2005032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1125,6 +1125,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
+ if (plc == GridIoPolicy.DATA_STREAMER_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
+ ctx.getDataStreamerExecutorService().execute(msg.partition(), c);
+
+ return;
+ }
+
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index dccd336..fe61aec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -336,6 +337,8 @@ public class GridIoMessage implements Message {
public int partition() {
if (msg instanceof GridCacheMessage)
return ((GridCacheMessage)msg).partition();
+ if (msg instanceof DataStreamerRequest)
+ return ((DataStreamerRequest)msg).partition();
else
return STRIPE_DISABLED_PART;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
index 39a6865..6c355f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static java.lang.Boolean.FALSE;
@@ -66,7 +67,7 @@ public abstract class PagesList extends DataStructure {
/** */
private static final int MAX_STRIPES_PER_BUCKET =
IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_STRIPES_PER_BUCKET",
- Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
+ Math.max(8, Runtime.getRuntime().availableProcessors()));
/** */
protected final AtomicLong[] bucketsSize;
@@ -507,6 +508,21 @@ public abstract class PagesList extends DataStructure {
* @throws IgniteCheckedException If failed.
*/
private Stripe getPageForPut(int bucket) throws IgniteCheckedException {
+ // Striped pool optimization.
+ int stripeIdx; IgniteThread igniteThread = IgniteThread.current();
+
+ if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) {
+ Stripe[] tails = getBucket(bucket);
+
+ while (tails == null || stripeIdx >= tails.length) {
+ addStripe(bucket, true);
+
+ tails = getBucket(bucket);
+ }
+
+ return tails[stripeIdx];
+ }
+
Stripe[] tails = getBucket(bucket);
if (tails == null)
@@ -607,12 +623,8 @@ public abstract class PagesList extends DataStructure {
try {
long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++); // Explicit check.
- if (tailAddr == 0L) {
- if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS)
- addStripeForReuseBucket(bucket);
-
+ if (tailAddr == 0L)
continue;
- }
assert PageIO.getPageId(tailAddr) == tailId : "pageId = " + PageIO.getPageId(tailAddr) + ", tailId = " + tailId;
assert PageIO.getType(tailAddr) == PageIO.T_PAGE_LIST_NODE;
@@ -912,13 +924,26 @@ public abstract class PagesList extends DataStructure {
* @param bucket Bucket index.
* @return Page for take.
*/
- private Stripe getPageForTake(int bucket) {
+ private Stripe getPageForTake(int bucket) throws IgniteCheckedException {
Stripe[] tails = getBucket(bucket);
if (tails == null || bucketsSize[bucket].get() == 0)
return null;
int len = tails.length;
+
+ // Striped pool optimization.
+ int stripeIdx; IgniteThread igniteThread = IgniteThread.current();
+
+ if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) {
+ if (stripeIdx >= len)
+ return null;
+
+ Stripe stripe = tails[stripeIdx];
+
+ return stripe.empty ? null : stripe;
+ }
+
int init = randomInt(len);
int cur = init;
@@ -943,6 +968,12 @@ public abstract class PagesList extends DataStructure {
*/
private long writeLockPage(long pageId, long page, int bucket, int lockAttempt)
throws IgniteCheckedException {
+ // Striped pool optimization.
+ IgniteThread igniteThread = IgniteThread.current();
+
+ if (igniteThread != null && igniteThread.stripe() != -1)
+ return writeLock(pageId, page);
+
long pageAddr = tryWriteLock(pageId, page);
if (pageAddr != 0L)
@@ -952,8 +983,7 @@ public abstract class PagesList extends DataStructure {
Stripe[] stripes = getBucket(bucket);
if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) {
- if (!isReuseBucket(bucket))
- addStripe(bucket, true);
+ addStripe(bucket, !isReuseBucket(bucket));
return 0L;
}
@@ -963,19 +993,6 @@ public abstract class PagesList extends DataStructure {
}
/**
- * @param bucket Bucket.
- * @throws IgniteCheckedException If failed.
- */
- private void addStripeForReuseBucket(int bucket) throws IgniteCheckedException {
- assert isReuseBucket(bucket);
-
- Stripe[] stripes = getBucket(bucket);
-
- if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET)
- addStripe(bucket, false);
- }
-
- /**
* @param bucket Bucket index.
* @param initIoVers Optional IO to initialize page.
* @return Removed page ID.
@@ -994,12 +1011,8 @@ public abstract class PagesList extends DataStructure {
try {
long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++); // Explicit check.
- if (tailAddr == 0L) {
- if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS)
- addStripeForReuseBucket(bucket);
-
+ if (tailAddr == 0L)
continue;
- }
if (stripe.empty) {
// Another thread took the last page.
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 9dfd338..07be8b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -513,7 +513,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
- @Override public ExecutorService getDataStreamerExecutorService() {
+ @Override public StripedExecutor getDataStreamerExecutorService() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6681710..1869dcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.datastreamer;
+import java.lang.reflect.Array;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -60,8 +61,10 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -152,7 +155,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
/** */
- private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+ private int parallelOps;
/** */
private long timeout = DFLT_UNLIMIT_TIMEOUT;
@@ -794,6 +797,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
initPda = false;
}
+ if (key.partition() == -1)
+ key.partition(cctx.affinity().partition(key, false));
+
nodes = nodes(key, topVer, cctx);
}
catch (IgniteCheckedException e) {
@@ -935,10 +941,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
};
- final GridFutureAdapter<?> f;
+ final List<GridFutureAdapter<?>> futs;
try {
- f = buf.update(entriesForNode, topVer, lsnr, remap);
+ futs = buf.update(entriesForNode, topVer, lsnr, remap);
}
catch (IgniteInterruptedCheckedException e1) {
resFut.onDone(e1);
@@ -954,9 +960,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Override public void run() {
buf0.onNodeLeft();
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
+ if (futs != null) {
+ Throwable ex = new ClusterTopologyCheckedException(
+ "Failed to wait for request completion (node has left): " + nodeId);
+
+ for (int i = 0; i < futs.size(); i++)
+ futs.get(i).onDone(ex);
+ }
}
}, ctx.discovery().topologyVersion(), false);
}
@@ -1314,11 +1324,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private final Collection<IgniteInternalFuture<Object>> locFuts;
/** Buffered entries. */
- private List<DataStreamerEntry> entries;
-
- /** */
- @GridToStringExclude
- private GridFutureAdapter<Object> curFut;
+ private final PerStripeBuffer[] stripes;
/** Local node flag. */
private final boolean isLocNode;
@@ -1332,16 +1338,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private final Semaphore sem;
- /** Batch topology. */
- private AffinityTopologyVersion batchTopVer;
+ /** */
+ private final int perNodeParallelOps;
/** Closure to signal on task finish. */
@GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- signalTaskFinished(t);
- }
- };
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
+ new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
/**
* @param node Node.
@@ -1357,24 +1364,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Cache local node flag.
isLocNode = node.equals(ctx.discovery().localNode());
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
- curFut.listen(signalC);
+ Integer attrStreamerPoolSize = node.attribute(IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE);
- sem = new Semaphore(parallelOps);
- }
+ int streamerPoolSize = attrStreamerPoolSize != null ? attrStreamerPoolSize : node.metrics().getTotalCpus();
- /**
- * @param remap Remapping flag.
- */
- private void renewBatch(boolean remap) {
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
+ perNodeParallelOps = parallelOps != 0 ? parallelOps :
+ streamerPoolSize * IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER;
- batchTopVer = null;
+ sem = new Semaphore(perNodeParallelOps);
- if (!remap)
- curFut.listen(signalC);
+ stripes = (PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, streamerPoolSize);
+
+ for (int i = 0; i < stripes.length; i++)
+ stripes[i] = new PerStripeBuffer(i, signalC);
}
/**
@@ -1385,61 +1387,69 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @return Future for operation.
* @throws IgniteInterruptedCheckedException If failed.
*/
- @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+ @Nullable List<GridFutureAdapter<?>> update(
+ Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
IgniteInClosure<IgniteInternalFuture<?>> lsnr,
- boolean remap) throws IgniteInterruptedCheckedException {
- List<DataStreamerEntry> entries0 = null;
+ boolean remap
+ ) throws IgniteInterruptedCheckedException {
+ List<GridFutureAdapter<?>> res = null;
- GridFutureAdapter<Object> curFut0;
+ for (DataStreamerEntry entry : newEntries) {
+ List<DataStreamerEntry> entries0 = null;
+ AffinityTopologyVersion curBatchTopVer;
+
+ // Init buffer.
+ int part = entry.getKey().partition();
- AffinityTopologyVersion curBatchTopVer;
+ GridFutureAdapter<Object> curFut0;
+ PerStripeBuffer b = stripes[part % stripes.length];
- synchronized (this) {
- curFut0 = curFut;
+ synchronized (b) {
+ curFut0 = b.curFut;
- curFut0.listen(lsnr);
+ // Listener should be added only once per whole entries collection.
+ // Should we simplify the model and get rid of all futures?
+ curFut0.listen(lsnr);
- if (batchTopVer == null)
- batchTopVer = topVer;
+ if (b.batchTopVer == null)
+ b.batchTopVer = topVer;
- curBatchTopVer = batchTopVer;
+ curBatchTopVer = b.batchTopVer;
- for (DataStreamerEntry entry : newEntries)
- entries.add(entry);
+ b.entries.add(entry);
- if (entries.size() >= bufSize) {
- entries0 = entries;
+ if (b.entries.size() >= bufSize) {
+ entries0 = b.entries;
- renewBatch(remap);
+ b.renewBatch(remap);
+ }
}
- }
- if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
- renewBatch(remap);
+ if (res == null)
+ res = new ArrayList<>();
- curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
- "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
- }
- else if (entries0 != null) {
- submit(entries0, curBatchTopVer, curFut0, remap);
-
- if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
- DataStreamerImpl.this));
- else if (ctx.clientDisconnected())
- curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Client node disconnected."));
- }
+ res.add(curFut0);
- return curFut0;
- }
+ if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+ b.renewBatch(remap);
- /**
- * @return Fresh collection with some space for outgrowth.
- */
- private List<DataStreamerEntry> newEntries() {
- return new ArrayList<>((int)(bufSize * 1.2));
+ curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
+ "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
+ }
+ else if (entries0 != null) {
+ submit(entries0, curBatchTopVer, curFut0, remap, b.partId);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ DataStreamerImpl.this));
+ else if (ctx.clientDisconnected())
+ curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected."));
+ }
+ }
+
+ return res;
}
/**
@@ -1447,24 +1457,26 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws IgniteInterruptedCheckedException If thread has been interrupted.
*/
@Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
- List<DataStreamerEntry> entries0 = null;
- GridFutureAdapter<Object> curFut0 = null;
-
acquireRemapSemaphore();
- synchronized (this) {
- if (!entries.isEmpty()) {
- entries0 = entries;
- curFut0 = curFut;
+ for (PerStripeBuffer b : stripes) {
+ AffinityTopologyVersion batchTopVer = null;
+ List<DataStreamerEntry> entries0 = null;
+ GridFutureAdapter<Object> curFut0 = null;
+
+ synchronized (b) {
+ if (!b.entries.isEmpty()) {
+ entries0 = b.entries;
+ curFut0 = b.curFut;
+ batchTopVer = b.batchTopVer;
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
- curFut.listen(signalC);
+ b.renewBatch(false);
+ }
}
- }
- if (entries0 != null)
- submit(entries0, batchTopVer, curFut0, false);
+ if (entries0 != null)
+ submit(entries0, batchTopVer, curFut0, false, b.partId);
+ }
// Create compound future for this flush.
GridCompoundFuture<Object, Object> res = null;
@@ -1618,13 +1630,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param topVer Topology version.
* @param curFut Current future.
* @param remap Remapping flag.
+ * @param partId Partition ID.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void submit(final Collection<DataStreamerEntry> entries,
+ private void submit(
+ final Collection<DataStreamerEntry> entries,
@Nullable AffinityTopologyVersion topVer,
final GridFutureAdapter<Object> curFut,
- boolean remap)
- throws IgniteInterruptedCheckedException {
+ boolean remap,
+ int partId
+ ) throws IgniteInterruptedCheckedException {
assert entries != null;
assert !entries.isEmpty();
assert curFut != null;
@@ -1685,7 +1700,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
cache.context().deploy().onEnter();
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+ U.error(log, "Failed to deploy class (request will not be sent): " +
+ jobPda0.deployClass(), e);
return;
}
@@ -1718,7 +1734,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
dep != null ? dep.participants() : null,
dep != null ? dep.classLoaderId() : null,
dep == null,
- topVer);
+ topVer,
+ (rcvr == ISOLATED_UPDATER) ?
+ partId : GridIoMessage.STRIPE_DISABLED_PART);
try {
ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc);
@@ -1767,11 +1785,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Make sure to complete current future.
GridFutureAdapter<Object> curFut0;
- synchronized (this) {
- curFut0 = curFut;
- }
+ for (PerStripeBuffer b : stripes) {
+ synchronized (b) {
+ curFut0 = b.curFut;
+ }
- curFut0.onDone(e);
+ curFut0.onDone(e);
+ }
}
/**
@@ -1845,10 +1865,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** {@inheritDoc} */
@Override public String toString() {
- int size;
+ int size = 0;
- synchronized (this) {
- size = entries.size();
+ for (int i = 0; i < stripes.length; i++) {
+ PerStripeBuffer b = stripes[i];
+
+ synchronized (b) {
+ size += b.entries.size();
+ }
}
return S.toString(Buffer.class, this,
@@ -1937,8 +1961,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache,
- Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+ @Override public void receive(
+ IgniteCache<KeyCacheObject, CacheObject> cache,
+ Collection<Map.Entry<KeyCacheObject, CacheObject>> entries
+ ) {
IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache();
@@ -2097,4 +2123,63 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
return S.toString(KeyCacheObjectWrapper.class, this);
}
}
+
+ /**
+ *
+ */
+ private class PerStripeBuffer {
+ /** */
+ private final int partId;
+
+ /** */
+ private List<DataStreamerEntry> entries;
+
+ /** */
+ private GridFutureAdapter<Object> curFut;
+
+ /** Batch topology. */
+ private AffinityTopologyVersion batchTopVer;
+
+ /** */
+ private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC;
+
+ /**
+ * @param partId Partition ID.
+ * @param c Signal closure.
+ */
+ public PerStripeBuffer(
+ int partId,
+ IgniteInClosure<? super IgniteInternalFuture<Object>> c
+ ) {
+ this.partId = partId;
+ signalC = c;
+
+ renewBatch(false);
+ }
+
+ /**
+ * @param remap Remap.
+ */
+ synchronized void renewBatch(boolean remap) {
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+
+ batchTopVer = null;
+
+ if (!remap)
+ curFut.listen(signalC);
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<DataStreamerEntry> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PerStripeBuffer.class, this, super.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index b4cbf66..f70ee9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -90,6 +90,9 @@ public class DataStreamerRequest implements Message {
/** Topology version. */
private AffinityTopologyVersion topVer;
+ /** */
+ private int partId;
+
/**
* {@code Externalizable} support.
*/
@@ -113,8 +116,10 @@ public class DataStreamerRequest implements Message {
* @param clsLdrId Class loader ID.
* @param forceLocDep Force local deployment.
* @param topVer Topology version.
+ * @param partId Partition ID.
*/
- public DataStreamerRequest(long reqId,
+ public DataStreamerRequest(
+ long reqId,
byte[] resTopicBytes,
@Nullable String cacheName,
byte[] updaterBytes,
@@ -128,7 +133,9 @@ public class DataStreamerRequest implements Message {
Map<UUID, IgniteUuid> ldrParticipants,
IgniteUuid clsLdrId,
boolean forceLocDep,
- @NotNull AffinityTopologyVersion topVer) {
+ @NotNull AffinityTopologyVersion topVer,
+ int partId
+ ) {
assert topVer != null;
this.reqId = reqId;
@@ -146,6 +153,7 @@ public class DataStreamerRequest implements Message {
this.clsLdrId = clsLdrId;
this.forceLocDep = forceLocDep;
this.topVer = topVer;
+ this.partId = partId;
}
/**
@@ -253,6 +261,13 @@ public class DataStreamerRequest implements Message {
return topVer;
}
+ /**
+ * @return Partition ID.
+ */
+ public int partition() {
+ return partId;
+ }
+
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
@@ -324,42 +339,48 @@ public class DataStreamerRequest implements Message {
writer.incrementState();
case 8:
- if (!writer.writeLong("reqId", reqId))
+ if (!writer.writeInt("partId", partId))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+ if (!writer.writeLong("reqId", reqId))
return false;
writer.incrementState();
case 10:
- if (!writer.writeString("sampleClsName", sampleClsName))
+ if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("skipStore", skipStore))
+ if (!writer.writeString("sampleClsName", sampleClsName))
return false;
writer.incrementState();
case 12:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
case 13:
- if (!writer.writeByteArray("updaterBytes", updaterBytes))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 14:
+ if (!writer.writeByteArray("updaterBytes", updaterBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
if (!writer.writeString("userVer", userVer))
return false;
@@ -447,7 +468,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 8:
- reqId = reader.readLong("reqId");
+ partId = reader.readInt("partId");
if (!reader.isLastRead())
return false;
@@ -455,7 +476,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 9:
- resTopicBytes = reader.readByteArray("resTopicBytes");
+ reqId = reader.readLong("reqId");
if (!reader.isLastRead())
return false;
@@ -463,7 +484,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 10:
- sampleClsName = reader.readString("sampleClsName");
+ resTopicBytes = reader.readByteArray("resTopicBytes");
if (!reader.isLastRead())
return false;
@@ -471,7 +492,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 11:
- skipStore = reader.readBoolean("skipStore");
+ sampleClsName = reader.readString("sampleClsName");
if (!reader.isLastRead())
return false;
@@ -479,7 +500,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 12:
- topVer = reader.readMessage("topVer");
+ skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
return false;
@@ -487,7 +508,7 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 13:
- updaterBytes = reader.readByteArray("updaterBytes");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
@@ -495,6 +516,14 @@ public class DataStreamerRequest implements Message {
reader.incrementState();
case 14:
+ updaterBytes = reader.readByteArray("updaterBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
userVer = reader.readString("userVer");
if (!reader.isLastRead())
@@ -514,6 +543,6 @@ public class DataStreamerRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 16;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6d5dc71..630d34c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.util;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -36,6 +38,8 @@ import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -62,6 +66,17 @@ public class StripedExecutor implements ExecutorService {
* @param log Logger.
*/
public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log) {
+ this(cnt, igniteInstanceName, poolName, log, false);
+ }
+
+ /**
+ * @param cnt Count.
+ * @param igniteInstanceName Node name.
+ * @param poolName Pool name.
+ * @param log Logger.
+ * @param stealTasks {@code True} to steal tasks.
+ */
+ public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) {
A.ensure(cnt > 0, "cnt > 0");
boolean success = false;
@@ -76,14 +91,19 @@ public class StripedExecutor implements ExecutorService {
try {
for (int i = 0; i < cnt; i++) {
- stripes[i] = new StripeConcurrentQueue(
+ stripes[i] = stealTasks ? new StripeConcurrentQueue(
igniteInstanceName,
poolName,
i,
- log);
+ log, stripes) : new StripeConcurrentQueue(
+ igniteInstanceName,
+ poolName,
+ i,
+ log);
+ }
+ for (int i = 0; i < cnt; i++)
stripes[i].start();
- }
success = true;
}
@@ -397,7 +417,7 @@ public class StripedExecutor implements ExecutorService {
private final String poolName;
/** */
- private final int idx;
+ protected final int idx;
/** */
private final IgniteLogger log;
@@ -536,8 +556,17 @@ public class StripedExecutor implements ExecutorService {
* Stripe.
*/
private static class StripeConcurrentQueue extends Stripe {
+ /** */
+ private static final int IGNITE_TASKS_STEALING_THRESHOLD =
+ IgniteSystemProperties.getInteger(
+ IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4);
+
/** Queue. */
- private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+ private final Queue<Runnable> queue;
+
+ /** */
+ @GridToStringExclude
+ private final Stripe[] others;
/** */
private volatile boolean parked;
@@ -548,16 +577,37 @@ public class StripedExecutor implements ExecutorService {
* @param idx Stripe index.
* @param log Logger.
*/
- public StripeConcurrentQueue(
+ StripeConcurrentQueue(
String igniteInstanceName,
String poolName,
int idx,
IgniteLogger log
) {
- super(igniteInstanceName,
+ this(igniteInstanceName, poolName, idx, log, null);
+ }
+
+ /**
+ * @param igniteInstanceName Ignite instance name.
+ * @param poolName Pool name.
+ * @param idx Stripe index.
+ * @param log Logger.
+ */
+ StripeConcurrentQueue(
+ String igniteInstanceName,
+ String poolName,
+ int idx,
+ IgniteLogger log,
+ Stripe[] others
+ ) {
+ super(
+ igniteInstanceName,
poolName,
idx,
log);
+
+ this.others = others;
+
+ this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
}
/** {@inheritDoc} */
@@ -580,6 +630,24 @@ public class StripedExecutor implements ExecutorService {
if (r != null)
return r;
+ if(others != null) {
+ int len = others.length;
+ int init = ThreadLocalRandom.current().nextInt(len);
+ int cur = init;
+
+ while (true) {
+ if(cur != idx) {
+ Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue;
+
+ if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
+ return r;
+ }
+
+ if ((cur = (cur + 1) % len) == init)
+ break;
+ }
+ }
+
LockSupport.park();
if (Thread.interrupted())
@@ -597,6 +665,13 @@ public class StripedExecutor implements ExecutorService {
if (parked)
LockSupport.unpark(thread);
+
+ if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) {
+ for (Stripe other : others) {
+ if(((StripeConcurrentQueue)other).parked)
+ LockSupport.unpark(other.thread);
+ }
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 83a0384..b8a91a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -158,6 +158,16 @@ public class IgniteThread extends Thread {
}
/**
+ * @return IgniteThread or {@code null} if current thread is not an instance of IgniteThread.
+ */
+ public static IgniteThread current(){
+ Thread thread = Thread.currentThread();
+
+ return thread.getClass() == IgniteThread.class || thread instanceof IgniteThread ?
+ ((IgniteThread)thread) : null;
+ }
+
+ /**
* Creates new thread name.
*
* @param num Thread number.
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index ec5e6d0..ac89021 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
- IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
- try {
+ try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
ldr.receiver(new StreamReceiver<String, String>() {
@Override public void receive(IgniteCache<String, String> cache,
Collection<Map.Entry<String, String>> entries) throws IgniteException {
@@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
cache.put("key", threadName);
}
});
+
ldr.addData("key", "value");
ldr.tryFlush();
@@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
}, 3_000);
}
- finally {
- ldr.close(true);
- }
assertNotNull(cache.get("key"));
@@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
- IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME);
-
- try {
+ try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) {
ldr.receiver(new StringStringStreamReceiver());
ldr.addData("key", "value");
@@ -1026,9 +1021,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
}, 3_000);
}
- finally {
- ldr.close(true);
- }
assertNotNull(cache.get("key"));
http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e72a9b4..6d3466b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -346,7 +346,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
req.participants(),
req.classLoaderId(),
req.forceLocalDeployment(),
- staleTop);
+ staleTop,
+ -1);
msg = new GridIoMessage(
GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),