You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/01 09:27:14 UTC
[29/50] [abbrv] ignite git commit: Added related GridIoPolicy in
IgniteThread.
Added related GridIoPolicy in IgniteThread.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a3650fb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a3650fb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a3650fb
Branch: refs/heads/ignite-5757
Commit: 3a3650fb2eadcd6a1186cc186fa5d471cd1e74d5
Parents: 2574beb
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 27 13:57:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 13:57:11 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 34 +++++++++++++------
.../managers/communication/GridIoPolicy.java | 3 ++
.../service/GridServiceProcessor.java | 2 +-
.../util/StripedCompositeReadWriteLock.java | 6 ++--
.../ignite/internal/util/StripedExecutor.java | 4 ++-
.../org/apache/ignite/thread/IgniteThread.java | 35 ++++++++++----------
.../ignite/thread/IgniteThreadFactory.java | 15 ++++++---
.../ignite/thread/IgniteThreadPoolExecutor.java | 33 +++++++++++++++++-
.../GridThreadPoolExecutorServiceSelfTest.java | 2 +-
9 files changed, 95 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 1139ec6..23baeb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -64,6 +64,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -1694,7 +1695,8 @@ public class IgnitionEx {
cfg.getPublicThreadPoolSize(),
cfg.getPublicThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.PUBLIC_POOL);
execSvc.allowCoreThreadTimeOut(true);
@@ -1706,7 +1708,8 @@ public class IgnitionEx {
cfg.getServiceThreadPoolSize(),
cfg.getServiceThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.SERVICE_POOL);
svcExecSvc.allowCoreThreadTimeOut(true);
@@ -1718,7 +1721,8 @@ public class IgnitionEx {
cfg.getSystemThreadPoolSize(),
cfg.getSystemThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.SYSTEM_POOL);
sysExecSvc.allowCoreThreadTimeOut(true);
@@ -1738,7 +1742,8 @@ public class IgnitionEx {
cfg.getManagementThreadPoolSize(),
cfg.getManagementThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.MANAGEMENT_POOL);
mgmtExecSvc.allowCoreThreadTimeOut(true);
@@ -1753,7 +1758,8 @@ public class IgnitionEx {
cfg.getPeerClassLoadingThreadPoolSize(),
cfg.getPeerClassLoadingThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.P2P_POOL);
p2pExecSvc.allowCoreThreadTimeOut(true);
@@ -1764,7 +1770,8 @@ public class IgnitionEx {
cfg.getDataStreamerThreadPoolSize(),
cfg.getDataStreamerThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.DATA_STREAMER_POOL);
dataStreamerExecSvc.allowCoreThreadTimeOut(true);
@@ -1811,7 +1818,8 @@ public class IgnitionEx {
myCfg.getUtilityCacheThreadPoolSize(),
myCfg.getUtilityCacheThreadPoolSize(),
myCfg.getUtilityCacheKeepAliveTime(),
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.UTILITY_CACHE_POOL);
utilityCacheExecSvc.allowCoreThreadTimeOut(true);
@@ -1821,7 +1829,8 @@ public class IgnitionEx {
1,
1,
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.AFFINITY_POOL);
affExecSvc.allowCoreThreadTimeOut(true);
@@ -1834,7 +1843,8 @@ public class IgnitionEx {
cpus,
cpus * 2,
3000L,
- new LinkedBlockingQueue<Runnable>(1000)
+ new LinkedBlockingQueue<Runnable>(1000),
+ GridIoPolicy.IDX_POOL
);
}
@@ -1846,7 +1856,8 @@ public class IgnitionEx {
cfg.getQueryThreadPoolSize(),
cfg.getQueryThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.QUERY_POOL);
qryExecSvc.allowCoreThreadTimeOut(true);
@@ -1856,7 +1867,8 @@ public class IgnitionEx {
2,
2,
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.SCHEMA_POOL);
schemaExecSvc.allowCoreThreadTimeOut(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 13bc4c4..3f31f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -22,6 +22,9 @@ package org.apache.ignite.internal.managers.communication;
* message processing by the communication manager.
*/
public class GridIoPolicy {
+ /** */
+ public static final byte UNDEFINED = -1;
+
/** Public execution pool. */
public static final byte PUBLIC_POOL = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index db632ec..46fcfea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -153,7 +153,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Thread factory. */
- private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName());
+ private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service");
/** Thread local for service name. */
private ThreadLocal<String> svcName = new ThreadLocal<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
index e215663..18ef06c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -67,8 +67,10 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock {
@NotNull @Override public Lock readLock() {
int idx;
- if (Thread.currentThread() instanceof IgniteThread) {
- IgniteThread igniteThread = (IgniteThread)Thread.currentThread();
+ Thread curThread = Thread.currentThread();
+
+ if (curThread instanceof IgniteThread) {
+ IgniteThread igniteThread = (IgniteThread)curThread;
idx = igniteThread.compositeRwLockIndex();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6c85b32..6d5dc71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -439,7 +440,8 @@ public class StripedExecutor implements ExecutorService {
poolName + "-stripe-" + idx,
this,
IgniteThread.GRP_IDX_UNASSIGNED,
- idx);
+ idx,
+ GridIoPolicy.UNDEFINED);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 6005ac9..c814625 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -18,6 +18,7 @@
package org.apache.ignite.thread;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -52,13 +53,16 @@ public class IgniteThread extends Thread {
/** */
private final int stripe;
+ /** */
+ private final byte plc;
+
/**
* Creates thread with given worker.
*
* @param worker Runnable to create thread with.
*/
public IgniteThread(GridWorker worker) {
- this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1);
+ this(worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
}
/**
@@ -69,41 +73,28 @@ public class IgniteThread extends Thread {
* @param r Runnable to execute.
*/
public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
- this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1);
- }
-
- /**
- * Creates grid thread with given name for a given Ignite instance.
- *
- * @param igniteInstanceName Name of the Ignite instance this thread is created for.
- * @param threadName Name of thread.
- * @param r Runnable to execute.
- * @param grpIdx Index within a group.
- * @param stripe Non-negative stripe number if this thread is striped pool thread.
- */
- public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
- this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe);
+ this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
}
/**
* Creates grid thread with given name for a given Ignite instance with specified
* thread group.
*
- * @param grp Thread group.
* @param igniteInstanceName Name of the Ignite instance this thread is created for.
* @param threadName Name of thread.
* @param r Runnable to execute.
* @param grpIdx Thread index within a group.
* @param stripe Non-negative stripe number if this thread is striped pool thread.
*/
- public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
- super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
+ public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe, byte plc) {
+ super(DFLT_GRP, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
A.ensure(grpIdx >= -1, "grpIdx >= -1");
this.igniteInstanceName = igniteInstanceName;
this.compositeRwLockIdx = grpIdx;
this.stripe = stripe;
+ this.plc = plc;
}
/**
@@ -117,6 +108,14 @@ public class IgniteThread extends Thread {
this.igniteInstanceName = igniteInstanceName;
this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
this.stripe = -1;
+ this.plc = GridIoPolicy.UNDEFINED;
+ }
+
+ /**
+ * @return Related {@link GridIoPolicy} for internal Ignite pools.
+ */
+ public byte policy() {
+ return plc;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index d2f0b15..062c973 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.thread;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
@@ -37,14 +38,18 @@ public class IgniteThreadFactory implements ThreadFactory {
/** Index generator for threads. */
private final AtomicInteger idxGen = new AtomicInteger();
+ /** */
+ private final byte plc;
+
/**
* Constructs new thread factory for given grid. All threads will belong
* to the same default thread group.
*
* @param igniteInstanceName Ignite instance name.
+ * @param threadName Thread name.
*/
- public IgniteThreadFactory(String igniteInstanceName) {
- this(igniteInstanceName, "ignite");
+ public IgniteThreadFactory(String igniteInstanceName, String threadName) {
+ this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED);
}
/**
@@ -53,15 +58,17 @@ public class IgniteThreadFactory implements ThreadFactory {
*
* @param igniteInstanceName Ignite instance name.
* @param threadName Thread name.
+ * @param plc {@link GridIoPolicy} for thread pool.
*/
- public IgniteThreadFactory(String igniteInstanceName, String threadName) {
+ public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc) {
this.igniteInstanceName = igniteInstanceName;
this.threadName = threadName;
+ this.plc = plc;
}
/** {@inheritDoc} */
@Override public Thread newThread(@NotNull Runnable r) {
- return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1);
+ return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 639ef94..83c64c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
@@ -46,13 +47,43 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
int maxPoolSize,
long keepAliveTime,
BlockingQueue<Runnable> workQ) {
+ this(threadNamePrefix,
+ igniteInstanceName,
+ corePoolSize,
+ maxPoolSize,
+ keepAliveTime,
+ workQ,
+ GridIoPolicy.UNDEFINED);
+ }
+
+ /**
+ * Creates a new service with the given initial parameters.
+ *
+ * @param threadNamePrefix Will be added at the beginning of all created threads.
+ * @param igniteInstanceName Must be the name of the grid.
+ * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
+ * @param maxPoolSize The maximum number of threads to allow in the pool.
+ * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
+ * that excess idle threads will wait for new tasks before terminating.
+ * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only
+ * runnable tasks submitted by the {@link #execute(Runnable)} method.
+ * @param plc {@link GridIoPolicy} for thread pool.
+ */
+ public IgniteThreadPoolExecutor(
+ String threadNamePrefix,
+ String igniteInstanceName,
+ int corePoolSize,
+ int maxPoolSize,
+ long keepAliveTime,
+ BlockingQueue<Runnable> workQ,
+ byte plc) {
super(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
workQ,
- new IgniteThreadFactory(igniteInstanceName, threadNamePrefix)
+ new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc)
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
index 3948f6a..dce6328 100644
--- a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
@@ -64,7 +64,7 @@ public class GridThreadPoolExecutorServiceSelfTest extends GridCommonAbstractTes
* @throws Exception If failed.
*/
public void testSingleGridThreadExecutor() throws Exception {
- ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName"));
+ ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName", "testThread"));
exec.submit(new InterruptingRunnable()).get();