You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/28 09:58:26 UTC
ignite git commit: IGNITE-3875: Introduced separate thread pool for
data streamer. This closes #1173. This closes #1383.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 7e73d0223 -> 7d82d6a06
IGNITE-3875: Introduced separate thread pool for data streamer. This closes #1173. This closes #1383.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d82d6a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d82d6a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d82d6a0
Branch: refs/heads/ignite-2.0
Commit: 7d82d6a06b5e9f1f8cd2909b865e37d46b8da03f
Parents: 7e73d02
Author: devozerov <vo...@gridgain.com>
Authored: Wed Dec 28 12:58:11 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 28 12:58:11 2016 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 31 ++++++
.../ignite/internal/GridKernalContext.java | 7 ++
.../ignite/internal/GridKernalContextImpl.java | 12 +++
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 19 ++++
.../managers/communication/GridIoManager.java | 2 +
.../managers/communication/GridIoPolicy.java | 3 +
.../closure/GridClosureProcessor.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 60 ++++++++---
.../datastreamer/DataStreamerImpl.java | 37 ++-----
.../internal/processors/pool/PoolProcessor.java | 5 +
.../DataStreamProcessorSelfTest.java | 104 +++++++++++++++++++
.../junits/GridTestKernalContext.java | 12 +--
13 files changed, 249 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index dcd8a80..e0ff9b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -148,6 +148,9 @@ public class IgniteConfiguration {
/** Default core size of public thread pool. */
public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
+ /** Default size of data streamer thread pool. */
+ public static final int DFLT_DATA_STREAMER_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
+
/** Default keep alive time for public thread pool. */
@Deprecated
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
@@ -251,6 +254,9 @@ public class IgniteConfiguration {
/** IGFS pool size. */
private int igfsPoolSize = AVAILABLE_PROC_CNT;
+ /** Data stream pool size. */
+ private int dataStreamerPoolSize = DFLT_DATA_STREAMER_POOL_SIZE;
+
/** Utility cache pool size. */
private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
@@ -514,6 +520,7 @@ public class IgniteConfiguration {
clockSyncFreq = cfg.getClockSyncFrequency();
clockSyncSamples = cfg.getClockSyncSamples();
consistentId = cfg.getConsistentId();
+ dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
deployMode = cfg.getDeploymentMode();
discoStartupDelay = cfg.getDiscoveryStartupDelay();
failureDetectionTimeout = cfg.getFailureDetectionTimeout();
@@ -837,6 +844,17 @@ public class IgniteConfiguration {
}
/**
+ * Size of thread pool that is in charge of processing data stream messages.
+ * <p>
+ * If not provided, executor service will have size {@link #DFLT_DATA_STREAMER_POOL_SIZE}.
+ *
+ * @return Thread pool size to be used for data stream messages.
+ */
+ public int getDataStreamerThreadPoolSize() {
+ return dataStreamerPoolSize;
+ }
+
+ /**
* Default size of thread pool that is in charge of processing utility cache messages.
* <p>
* If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -960,6 +978,19 @@ public class IgniteConfiguration {
}
/**
+ * Set thread pool size that will be used to process data stream messages.
+ *
+ * @param poolSize Executor service to use for data stream messages.
+ * @see IgniteConfiguration#getDataStreamerThreadPoolSize()
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setDataStreamerThreadPoolSize(int poolSize) {
+ dataStreamerPoolSize = poolSize;
+
+ return this;
+ }
+
+ /**
* Sets default thread pool size that will be used to process utility cache messages.
*
* @param poolSize Default executor service size to use for utility cache messages.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 927944f..9157fed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -542,6 +542,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public ExecutorService getIgfsExecutorService();
/**
+ * Executor service that is in charge of processing data stream messages.
+ *
+ * @return Thread pool implementation to be used for data stream messages.
+ */
+ public ExecutorService getDataStreamerExecutorService();
+
+ /**
* Should return an instance of fully configured thread pool to be used for
* processing of client messages (REST requests).
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a2ad1b2..8fc5b36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -317,6 +317,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ private ExecutorService dataStreamExecSvc;
+
+ /** */
+ @GridToStringExclude
protected ExecutorService restExecSvc;
/** */
@@ -390,6 +394,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
+ * @param dataStreamExecSvc data stream executor service.
* @param restExecSvc REST executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
@@ -410,6 +415,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
+ ExecutorService dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
@@ -431,6 +437,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.p2pExecSvc = p2pExecSvc;
this.mgmtExecSvc = mgmtExecSvc;
this.igfsExecSvc = igfsExecSvc;
+ this.dataStreamExecSvc = dataStreamExecSvc;
this.restExecSvc = restExecSvc;
this.affExecSvc = affExecSvc;
this.idxExecSvc = idxExecSvc;
@@ -977,6 +984,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public ExecutorService getDataStreamerExecutorService() {
+ return dataStreamExecSvc;
+ }
+
+ /** {@inheritDoc} */
@Override public ExecutorService getRestExecutorService() {
return restExecSvc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4972d1f..99c3dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -668,6 +668,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
+ * @param dataStreamExecSvc data stream executor service.
* @param restExecSvc Reset executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
@@ -685,6 +686,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
+ ExecutorService dataStreamExecSvc,
ExecutorService restExecSvc,
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
@@ -794,6 +796,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
p2pExecSvc,
mgmtExecSvc,
igfsExecSvc,
+ dataStreamExecSvc,
restExecSvc,
affExecSvc,
idxExecSvc,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f32a753..9fe6fd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1472,6 +1472,9 @@ public class IgnitionEx {
/** IGFS executor service. */
private ThreadPoolExecutor igfsExecSvc;
+ /** Data streamer executor service. */
+ private ThreadPoolExecutor dataStreamerExecSvc;
+
/** REST requests executor service. */
private ThreadPoolExecutor restExecSvc;
@@ -1702,6 +1705,17 @@ public class IgnitionEx {
p2pExecSvc.allowCoreThreadTimeOut(true);
+ // Note that we do not pre-start threads here as this pool may not be needed.
+ dataStreamerExecSvc = new IgniteThreadPoolExecutor(
+ "data-streamer",
+ cfg.getGridName(),
+ cfg.getDataStreamerThreadPoolSize(),
+ cfg.getDataStreamerThreadPoolSize(),
+ DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>());
+
+ dataStreamerExecSvc.allowCoreThreadTimeOut(true);
+
// Note that we do not pre-start threads here as igfs pool may not be needed.
validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS");
@@ -1806,6 +1820,7 @@ public class IgnitionEx {
p2pExecSvc,
mgmtExecSvc,
igfsExecSvc,
+ dataStreamerExecSvc,
restExecSvc,
affExecSvc,
idxExecSvc,
@@ -2445,6 +2460,10 @@ public class IgnitionEx {
p2pExecSvc = null;
+ U.shutdownNow(getClass(), dataStreamerExecSvc, log);
+
+ dataStreamerExecSvc = null;
+
U.shutdownNow(getClass(), igfsExecSvc, log);
igfsExecSvc = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 7ef7bc0..de34adb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -90,6 +90,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
@@ -686,6 +687,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case MARSH_CACHE_POOL:
case IDX_POOL:
case IGFS_POOL:
+ case DATA_STREAMER_POOL:
{
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 70a7354..18235d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -49,6 +49,9 @@ public class GridIoPolicy {
/** Pool for handling distributed index range requests. */
public static final byte IDX_POOL = 8;
+ /** Data streamer execution pool. */
+ public static final byte DATA_STREAMER_POOL = 9;
+
/**
* Defines the range of reserved pools that are not available for plugins.
* @param key The key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index a07dbf8..5ba21d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -988,7 +988,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param plc Policy to choose executor pool.
* @return Future.
*/
- private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) {
+ public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) {
try {
return callLocal(c, plc);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index fee4dd6..5ebfd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -17,11 +17,8 @@
package org.apache.ignite.internal.processors.datastreamer;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.DelayQueue;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -40,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
@@ -47,11 +45,15 @@ import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.DelayQueue;
+
import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DATA_STREAMER_POOL;
/**
- *
+ * Data stream processor.
*/
public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
/** Loaders map (access is not supposed to be highly concurrent). */
@@ -224,13 +226,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
if (fut != null && !fut.isDone()) {
+ final byte plc = threadIoPolicy();
+
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
processRequest(nodeId, req);
}
- }, false);
+ }, plc);
}
});
@@ -416,12 +420,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
try {
- Byte plc = GridIoManager.currentPolicy();
-
- if (plc == null)
- plc = PUBLIC_POOL;
-
- ctx.io().send(nodeId, resTopic, res, plc);
+ ctx.io().send(nodeId, resTopic, res, threadIoPolicy());
}
catch (IgniteCheckedException e) {
if (ctx.discovery().alive(nodeId))
@@ -431,6 +430,41 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
}
}
+ /**
+ * Get IO policy.
+ *
+ * @return IO policy.
+ */
+ private static byte threadIoPolicy() {
+ Byte plc = GridIoManager.currentPolicy();
+
+ if (plc == null)
+ plc = DATA_STREAMER_POOL;
+
+ return plc;
+ }
+
+ /**
+ * Get IO policy for particular node with provided resolver.
+ *
+ * @param rslvr Resolver.
+ * @param node Node.
+ * @return IO policy.
+ */
+ public static byte ioPolicy(@Nullable IgniteClosure<ClusterNode, Byte> rslvr, ClusterNode node) {
+ assert node != null;
+
+ Byte res = null;
+
+ if (rslvr != null)
+ res = rslvr.apply(node);
+
+ if (res == null)
+ res = DATA_STREAMER_POOL;
+
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index bb9ffdd..0526162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -112,16 +111,12 @@ import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
/**
* Data streamer implementation.
*/
@SuppressWarnings("unchecked")
public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
- /** Default policy resolver. */
- private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
-
/** Isolated receiver. */
private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
@@ -135,7 +130,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
private byte[] updaterBytes;
/** IO policy resovler for data load request. */
- private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+ private IgniteClosure<ClusterNode, Byte> ioPlcRslvr;
/** Max remap count before issuing an error. */
private static final int DFLT_MAX_REMAP_CNT = 32;
@@ -1509,10 +1504,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param entries Entries.
* @param reqTopVer Request topology version.
* @param curFut Current future.
+ * @param plc Policy.
*/
private void localUpdate(final Collection<DataStreamerEntry> entries,
final AffinityTopologyVersion reqTopVer,
- final GridFutureAdapter<Object> curFut) {
+ final GridFutureAdapter<Object> curFut,
+ final byte plc) {
try {
GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
@@ -1543,7 +1540,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
skipStore,
keepBinary,
rcvr),
- false);
+ plc);
locFuts.add(callFut);
@@ -1573,7 +1570,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
else {
fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
- localUpdate(entries, reqTopVer, curFut);
+ localUpdate(entries, reqTopVer, curFut, plc);
}
});
}
@@ -1617,13 +1614,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
IgniteInternalFuture<Object> fut;
- Byte plc = ioPlcRslvr.apply(node);
-
- if (plc == null)
- plc = PUBLIC_POOL;
+ byte plc = DataStreamProcessor.ioPolicy(ioPlcRslvr, node);
- if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL)
- localUpdate(entries, topVer, curFut);
+ if (isLocNode)
+ localUpdate(entries, topVer, curFut, plc);
else {
try {
for (DataStreamerEntry e : entries) {
@@ -1975,19 +1969,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
- * Default IO policy resolver.
- */
- private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public Byte apply(ClusterNode gridNode) {
- return PUBLIC_POOL;
- }
- }
-
- /**
* Key object wrapper. Using identity equals prevents slow down in case of hash code collision.
*/
private static class KeyCacheObjectWrapper {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 59e5e7d..89140b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter {
return ctx.getIgfsExecutorService();
+ case GridIoPolicy.DATA_STREAMER_POOL:
+ assert ctx.getDataStreamerExecutorService() != null : "Data streamer pool is not configured.";
+
+ return ctx.getDataStreamerExecutorService();
+
default: {
if (plc < 0)
throw new IgniteCheckedException("Policy cannot be negative: " + plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 0f8ae29..d00e08b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -33,6 +33,7 @@ import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
@@ -59,6 +61,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.stream.StreamReceiver;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -949,6 +952,94 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testLocalDataStreamerDedicatedThreadPool() throws Exception {
+ try {
+ useCache = true;
+
+ Ignite ignite = startGrid(1);
+
+ final IgniteCache<String, String> cache = ignite.cache(null);
+
+ IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(null);
+ try {
+ ldr.receiver(new StreamReceiver<String, String>() {
+ @Override public void receive(IgniteCache<String, String> cache,
+ Collection<Map.Entry<String, String>> entries) throws IgniteException {
+ String threadName = Thread.currentThread().getName();
+
+ cache.put("key", threadName);
+ }
+ });
+ ldr.addData("key", "value");
+
+ ldr.tryFlush();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return cache.get("key") != null;
+ }
+ }, 3_000);
+ }
+ finally {
+ ldr.close(true);
+ }
+
+ assertNotNull(cache.get("key"));
+
+ assertTrue(cache.get("key").startsWith("data-streamer"));
+
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteDataStreamerDedicatedThreadPool() throws Exception {
+ try {
+ useCache = true;
+
+ Ignite ignite = startGrid(1);
+
+ useCache = false;
+
+ Ignite client = startGrid(0);
+
+ final IgniteCache<String, String> cache = ignite.cache(null);
+
+ IgniteDataStreamer<String, String> ldr = client.dataStreamer(null);
+
+ try {
+ ldr.receiver(new StringStringStreamReceiver());
+
+ ldr.addData("key", "value");
+
+ ldr.tryFlush();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return cache.get("key") != null;
+ }
+ }, 3_000);
+ }
+ finally {
+ ldr.close(true);
+ }
+
+ assertNotNull(cache.get("key"));
+
+ assertTrue(cache.get("key").startsWith("data-streamer"));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
*
*/
public static class TestObject {
@@ -1024,4 +1115,17 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
}
}
}
+
+ /**
+ *
+ */
+ private static class StringStringStreamReceiver implements StreamReceiver<String, String> {
+ /** {@inheritDoc} */
+ @Override public void receive(IgniteCache<String, String> cache,
+ Collection<Map.Entry<String, String>> entries) throws IgniteException {
+ String threadName = Thread.currentThread().getName();
+
+ cache.put("key", threadName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d82d6a0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 143159d..40f0e43 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -64,6 +64,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
U.allPluginProviders()
);
@@ -98,11 +99,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
}
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridTestKernalContext.class, this, super.toString());
- }
-
/**
* Sets system executor service.
*
@@ -112,7 +108,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
this.sysExecSvc = sysExecSvc;
}
-
/**
* Sets executor service.
*
@@ -121,4 +116,9 @@ public class GridTestKernalContext extends GridKernalContextImpl {
public void setExecutorService(ExecutorService execSvc){
this.execSvc = execSvc;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridTestKernalContext.class, this, super.toString());
+ }
}