You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/01 09:27:14 UTC

[29/50] [abbrv] ignite git commit: Added related GridIoPolicy in IgniteThread.

Added related GridIoPolicy in IgniteThread.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a3650fb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a3650fb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a3650fb

Branch: refs/heads/ignite-5757
Commit: 3a3650fb2eadcd6a1186cc186fa5d471cd1e74d5
Parents: 2574beb
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 27 13:57:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 13:57:11 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 34 +++++++++++++------
 .../managers/communication/GridIoPolicy.java    |  3 ++
 .../service/GridServiceProcessor.java           |  2 +-
 .../util/StripedCompositeReadWriteLock.java     |  6 ++--
 .../ignite/internal/util/StripedExecutor.java   |  4 ++-
 .../org/apache/ignite/thread/IgniteThread.java  | 35 ++++++++++----------
 .../ignite/thread/IgniteThreadFactory.java      | 15 ++++++---
 .../ignite/thread/IgniteThreadPoolExecutor.java | 33 +++++++++++++++++-
 .../GridThreadPoolExecutorServiceSelfTest.java  |  2 +-
 9 files changed, 95 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 1139ec6..23baeb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -64,6 +64,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -1694,7 +1695,8 @@ public class IgnitionEx {
                 cfg.getPublicThreadPoolSize(),
                 cfg.getPublicThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.PUBLIC_POOL);
 
             execSvc.allowCoreThreadTimeOut(true);
 
@@ -1706,7 +1708,8 @@ public class IgnitionEx {
                 cfg.getServiceThreadPoolSize(),
                 cfg.getServiceThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.SERVICE_POOL);
 
             svcExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1718,7 +1721,8 @@ public class IgnitionEx {
                 cfg.getSystemThreadPoolSize(),
                 cfg.getSystemThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.SYSTEM_POOL);
 
             sysExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1738,7 +1742,8 @@ public class IgnitionEx {
                 cfg.getManagementThreadPoolSize(),
                 cfg.getManagementThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.MANAGEMENT_POOL);
 
             mgmtExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1753,7 +1758,8 @@ public class IgnitionEx {
                 cfg.getPeerClassLoadingThreadPoolSize(),
                 cfg.getPeerClassLoadingThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.P2P_POOL);
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1764,7 +1770,8 @@ public class IgnitionEx {
                 cfg.getDataStreamerThreadPoolSize(),
                 cfg.getDataStreamerThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.DATA_STREAMER_POOL);
 
             dataStreamerExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1811,7 +1818,8 @@ public class IgnitionEx {
                 myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheKeepAliveTime(),
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.UTILITY_CACHE_POOL);
 
             utilityCacheExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1821,7 +1829,8 @@ public class IgnitionEx {
                 1,
                 1,
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.AFFINITY_POOL);
 
             affExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1834,7 +1843,8 @@ public class IgnitionEx {
                     cpus,
                     cpus * 2,
                     3000L,
-                    new LinkedBlockingQueue<Runnable>(1000)
+                    new LinkedBlockingQueue<Runnable>(1000),
+                    GridIoPolicy.IDX_POOL
                 );
             }
 
@@ -1846,7 +1856,8 @@ public class IgnitionEx {
                 cfg.getQueryThreadPoolSize(),
                 cfg.getQueryThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.QUERY_POOL);
 
             qryExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1856,7 +1867,8 @@ public class IgnitionEx {
                 2,
                 2,
                 DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<Runnable>(),
+                GridIoPolicy.SCHEMA_POOL);
 
             schemaExecSvc.allowCoreThreadTimeOut(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 13bc4c4..3f31f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -22,6 +22,9 @@ package org.apache.ignite.internal.managers.communication;
  * message processing by the communication manager.
  */
 public class GridIoPolicy {
+    /** */
+    public static final byte UNDEFINED = -1;
+
     /** Public execution pool. */
     public static final byte PUBLIC_POOL = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index db632ec..46fcfea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -153,7 +153,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** Thread factory. */
-    private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName());
+    private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service");
 
     /** Thread local for service name. */
     private ThreadLocal<String> svcName = new ThreadLocal<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
index e215663..18ef06c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -67,8 +67,10 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock {
     @NotNull @Override public Lock readLock() {
         int idx;
 
-        if (Thread.currentThread() instanceof IgniteThread) {
-            IgniteThread igniteThread = (IgniteThread)Thread.currentThread();
+        Thread curThread = Thread.currentThread();
+
+        if (curThread instanceof IgniteThread) {
+            IgniteThread igniteThread = (IgniteThread)curThread;
 
             idx = igniteThread.compositeRwLockIndex();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6c85b32..6d5dc71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -439,7 +440,8 @@ public class StripedExecutor implements ExecutorService {
                 poolName + "-stripe-" + idx,
                 this,
                 IgniteThread.GRP_IDX_UNASSIGNED,
-                idx);
+                idx,
+                GridIoPolicy.UNDEFINED);
 
             thread.start();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index 6005ac9..c814625 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.thread;
 
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -52,13 +53,16 @@ public class IgniteThread extends Thread {
     /** */
     private final int stripe;
 
+    /** */
+    private final byte plc;
+
     /**
      * Creates thread with given worker.
      *
      * @param worker Runnable to create thread with.
      */
     public IgniteThread(GridWorker worker) {
-        this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1);
+        this(worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
     }
 
     /**
@@ -69,41 +73,28 @@ public class IgniteThread extends Thread {
      * @param r Runnable to execute.
      */
     public IgniteThread(String igniteInstanceName, String threadName, Runnable r) {
-        this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1);
-    }
-
-    /**
-     * Creates grid thread with given name for a given Ignite instance.
-     *
-     * @param igniteInstanceName Name of the Ignite instance this thread is created for.
-     * @param threadName Name of thread.
-     * @param r Runnable to execute.
-     * @param grpIdx Index within a group.
-     * @param stripe Non-negative stripe number if this thread is striped pool thread.
-     */
-    public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
-        this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe);
+        this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1, GridIoPolicy.UNDEFINED);
     }
 
     /**
      * Creates grid thread with given name for a given Ignite instance with specified
      * thread group.
      *
-     * @param grp Thread group.
      * @param igniteInstanceName Name of the Ignite instance this thread is created for.
      * @param threadName Name of thread.
      * @param r Runnable to execute.
      * @param grpIdx Thread index within a group.
      * @param stripe Non-negative stripe number if this thread is striped pool thread.
      */
-    public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) {
-        super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
+    public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe, byte plc) {
+        super(DFLT_GRP, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName));
 
         A.ensure(grpIdx >= -1, "grpIdx >= -1");
 
         this.igniteInstanceName = igniteInstanceName;
         this.compositeRwLockIdx = grpIdx;
         this.stripe = stripe;
+        this.plc = plc;
     }
 
     /**
@@ -117,6 +108,14 @@ public class IgniteThread extends Thread {
         this.igniteInstanceName = igniteInstanceName;
         this.compositeRwLockIdx = GRP_IDX_UNASSIGNED;
         this.stripe = -1;
+        this.plc = GridIoPolicy.UNDEFINED;
+    }
+
+    /**
+     * @return Related {@link GridIoPolicy} for internal Ignite pools.
+     */
+    public byte policy() {
+        return plc;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index d2f0b15..062c973 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.thread;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 
@@ -37,14 +38,18 @@ public class IgniteThreadFactory implements ThreadFactory {
     /** Index generator for threads. */
     private final AtomicInteger idxGen = new AtomicInteger();
 
+    /** */
+    private final byte plc;
+
     /**
      * Constructs new thread factory for given grid. All threads will belong
      * to the same default thread group.
      *
      * @param igniteInstanceName Ignite instance name.
+     * @param threadName Thread name.
      */
-    public IgniteThreadFactory(String igniteInstanceName) {
-        this(igniteInstanceName, "ignite");
+    public IgniteThreadFactory(String igniteInstanceName, String threadName) {
+        this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED);
     }
 
     /**
@@ -53,15 +58,17 @@ public class IgniteThreadFactory implements ThreadFactory {
      *
      * @param igniteInstanceName Ignite instance name.
      * @param threadName Thread name.
+     * @param plc {@link GridIoPolicy} for thread pool.
      */
-    public IgniteThreadFactory(String igniteInstanceName, String threadName) {
+    public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc) {
         this.igniteInstanceName = igniteInstanceName;
         this.threadName = threadName;
+        this.plc = plc;
     }
 
     /** {@inheritDoc} */
     @Override public Thread newThread(@NotNull Runnable r) {
-        return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1);
+        return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 639ef94..83c64c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
@@ -46,13 +47,43 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
         int maxPoolSize,
         long keepAliveTime,
         BlockingQueue<Runnable> workQ) {
+        this(threadNamePrefix,
+            igniteInstanceName,
+            corePoolSize,
+            maxPoolSize,
+            keepAliveTime,
+            workQ,
+            GridIoPolicy.UNDEFINED);
+    }
+
+    /**
+     * Creates a new service with the given initial parameters.
+     *
+     * @param threadNamePrefix Will be added at the beginning of all created threads.
+     * @param igniteInstanceName Must be the name of the grid.
+     * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
+     * @param maxPoolSize The maximum number of threads to allow in the pool.
+     * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
+     *      that excess idle threads will wait for new tasks before terminating.
+     * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only
+     *      runnable tasks submitted by the {@link #execute(Runnable)} method.
+     * @param plc {@link GridIoPolicy} for thread pool.
+     */
+    public IgniteThreadPoolExecutor(
+        String threadNamePrefix,
+        String igniteInstanceName,
+        int corePoolSize,
+        int maxPoolSize,
+        long keepAliveTime,
+        BlockingQueue<Runnable> workQ,
+        byte plc) {
         super(
             corePoolSize,
             maxPoolSize,
             keepAliveTime,
             TimeUnit.MILLISECONDS,
             workQ,
-            new IgniteThreadFactory(igniteInstanceName, threadNamePrefix)
+            new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc)
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a3650fb/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
index 3948f6a..dce6328 100644
--- a/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/thread/GridThreadPoolExecutorServiceSelfTest.java
@@ -64,7 +64,7 @@ public class GridThreadPoolExecutorServiceSelfTest extends GridCommonAbstractTes
      * @throws Exception If failed.
      */
     public void testSingleGridThreadExecutor() throws Exception {
-        ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName"));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new IgniteThreadFactory("gridName", "testThread"));
 
         exec.submit(new InterruptingRunnable()).get();