You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 19:06:29 UTC

incubator-ignite git commit: #[GG-10298]: workable solution.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg-10298 [created] 999b5f03a


#[GG-10298]: workable solution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/999b5f03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/999b5f03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/999b5f03

Branch: refs/heads/ignite-gg-10298
Commit: 999b5f03a0fabb959b3600e3554db01f92d4d79e
Parents: e1c49b7
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 25 20:06:06 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 25 20:06:06 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      | 32 ++++++++++++++++++++
 .../ignite/internal/GridKernalContext.java      |  5 +++
 .../ignite/internal/GridKernalContextImpl.java  | 11 +++++++
 .../apache/ignite/internal/IgniteKernal.java    |  4 ++-
 .../org/apache/ignite/internal/IgnitionEx.java  | 12 +++++++-
 .../managers/communication/GridIoManager.java   | 15 ++++++++-
 .../managers/communication/GridIoPolicy.java    |  5 ++-
 .../junits/GridTestKernalContext.java           | 10 ++++++
 8 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..73d0c80 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -156,12 +156,18 @@ public class IgniteConfiguration {
     /** Default max queue capacity of system thread pool. */
     public static final int DFLT_SYSTEM_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
 
+    /** Default max queue capacity of DR (Data Replication) thread pool. */
+    public static final int DFLT_DR_THREADPOOL_QUEUE_CAP = 40;
+
     /** Default size of peer class loading thread pool. */
     public static final int DFLT_P2P_THREAD_CNT = 2;
 
     /** Default size of management thread pool. */
     public static final int DFLT_MGMT_THREAD_CNT = 4;
 
+    /** Default size of DR (Data Replication) thread pool. */
+    public static final int DFLT_DR_THREAD_CNT = 4;
+
     /** Default segmentation policy. */
     public static final SegmentationPolicy DFLT_SEG_PLC = STOP;
 
@@ -210,6 +216,9 @@ public class IgniteConfiguration {
     /** IGFS pool size. */
     private int igfsPoolSize = AVAILABLE_PROC_CNT;
 
+    /** DR pool size. */
+    private int drPoolSize = DFLT_DR_THREAD_CNT;
+
     /** Utility cache pool size. */
     private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -682,6 +691,17 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Size of thread pool that is in charge of processing DR (Data Replication) messages.
+     * <p>
+     * If not provided, executor service will have size {@link #DFLT_DR_THREAD_CNT}.
+     *
+     * @return Thread pool size to be used for DR message processing.
+     */
+    public int getDrThreadPoolSize() {
+        return drPoolSize;
+    }
+
+    /**
      * Default size of thread pool that is in charge of processing utility cache messages.
      * <p>
      * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -791,6 +811,18 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Set thread pool size that will be used to send and receive DR (Data Replication) messages.
+     *
+     * @param poolSize Set thread pool size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setDrThreadPoolSize(int poolSize) {
+        drPoolSize = poolSize;
+
+        return this;
+    }
+
+    /**
      * Sets default thread pool size that will be used to process utility cache messages.
      *
      * @param poolSize Default executor service size to use for utility cache messages.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index d6542f3..c354609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -465,6 +465,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public ExecutorService getExecutorService();
 
     /**
+     * @return Thread pool implementation to be used in grid data replication.
+     */
+    public ExecutorService getDrExecutorService();
+
+    /**
      * Executor service that is in charge of processing internal system messages.
      *
      * @return Thread pool implementation to be used in grid for internal system messages.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 65107a7..ecc02a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -254,6 +254,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected ExecutorService drExecSvc;
+
+    /** */
+    @GridToStringExclude
     protected ExecutorService sysExecSvc;
 
     /** */
@@ -341,6 +345,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        ExecutorService drExecSvc,
         List<PluginProvider> plugins) throws IgniteCheckedException {
         assert grid != null;
         assert cfg != null;
@@ -357,6 +362,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
         this.restExecSvc = restExecSvc;
+        this.drExecSvc = drExecSvc;
 
         marshCtx = new MarshallerContextImpl(plugins);
 
@@ -838,6 +844,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public ExecutorService getDrExecutorService() {
+        return drExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public ExecutorService getSystemExecutorService() {
         return sysExecSvc;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e19d3d3..2c6596b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -77,7 +77,6 @@ import javax.management.*;
 import java.io.*;
 import java.lang.management.*;
 import java.lang.reflect.*;
-import java.security.*;
 import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -528,6 +527,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
      * @param restExecSvc Reset executor service.
+     * @param drExecSvc DR (Data Replication) executor service.
      * @param errHnd Error handler to use for notification about startup problems.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
@@ -541,6 +541,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        ExecutorService drExecSvc,
         GridAbsClosure errHnd)
         throws IgniteCheckedException
     {
@@ -646,6 +647,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 mgmtExecSvc,
                 igfsExecSvc,
                 restExecSvc,
+                drExecSvc,
                 plugins);
 
             cfg.getMarshaller().setContext(ctx.marshallerContext());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5cbe377..b502f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1296,6 +1296,9 @@ public class IgnitionEx {
         /** Marshaller cache executor service. */
         private ExecutorService marshCacheExecSvc;
 
+        /** DR cache executor service. */
+        private ExecutorService drExecSvc;
+
         /** Grid state. */
         private volatile IgniteState state = STOPPED;
 
@@ -1524,6 +1527,13 @@ public class IgnitionEx {
                 myCfg.getMarshallerCacheKeepAliveTime(),
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
+            drExecSvc = new IgniteThreadPoolExecutor(
+                    "data-replication-" + cfg.getGridName(),
+                    cfg.getDrThreadPoolSize(),
+                    cfg.getDrThreadPoolSize(),
+                    DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                    new LinkedBlockingQueue<Runnable>(DFLT_DR_THREADPOOL_QUEUE_CAP));
+
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());
 
@@ -1536,7 +1546,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc,
+                    igfsExecSvc, restExecSvc, drExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4382731..b522a1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -83,6 +83,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Affinity assignment executor service. */
     private ExecutorService affPool;
 
+    /** DR (Data Replication) executor service. */
+    private ExecutorService drPool;
+
     /** Utility cache pool. */
     private ExecutorService utilityCachePool;
 
@@ -185,6 +188,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         p2pPool = ctx.getPeerClassLoadingExecutorService();
         sysPool = ctx.getSystemExecutorService();
         mgmtPool = ctx.getManagementExecutorService();
+        drPool  = ctx.getDrExecutorService();
         utilityCachePool = ctx.utilityCachePool();
         marshCachePool = ctx.marshallerCachePool();
         affPool = new IgniteThreadPoolExecutor(
@@ -531,7 +535,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 case MANAGEMENT_POOL:
                 case AFFINITY_POOL:
                 case UTILITY_CACHE_POOL:
-                case MARSH_CACHE_POOL: {
+                case MARSH_CACHE_POOL:
+                case DR_POOL:
+                {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
                     else
@@ -539,6 +545,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                     break;
                 }
+
+                default:
+                    throw new IllegalStateException("Failed to process message dues to " +
+                        "unknown policy, [policy=" + msg.policy() + ']');
             }
         }
         catch (IgniteCheckedException e) {
@@ -568,6 +578,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             case AFFINITY_POOL:
                 return affPool;
 
+            case DR_POOL:
+                return drPool;
+
             case UTILITY_CACHE_POOL:
                 assert utilityCachePool != null : "Utility cache pool is not configured.";
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 6e45043..ad74baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -43,7 +43,10 @@ public enum GridIoPolicy {
     UTILITY_CACHE_POOL,
 
     /** Marshaller cache execution pool. */
-    MARSH_CACHE_POOL;
+    MARSH_CACHE_POOL,
+
+    /** DR internal messaging pool. */
+    DR_POOL;
 
     /** Enum values. */
     private static final GridIoPolicy[] VALS = values();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 24502da..4485d5e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -46,6 +46,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
             null,
             null,
             null,
+            null,
             U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());
@@ -102,4 +103,13 @@ public class GridTestKernalContext extends GridKernalContextImpl {
     public void setExecutorService(ExecutorService execSvc){
         this.execSvc = execSvc;
     }
+
+    /**
+     * Sets executor service.
+     *
+     * @param execSvc Executor service
+     */
+    public void setDrExecutorService(ExecutorService execSvc){
+        this.drExecSvc = execSvc;
+    }
 }