You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/25 19:06:29 UTC
incubator-ignite git commit: #[GG-10298]: workable solution.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-gg-10298 [created] 999b5f03a
#[GG-10298]: workable solution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/999b5f03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/999b5f03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/999b5f03
Branch: refs/heads/ignite-gg-10298
Commit: 999b5f03a0fabb959b3600e3554db01f92d4d79e
Parents: e1c49b7
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jun 25 20:06:06 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jun 25 20:06:06 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 32 ++++++++++++++++++++
.../ignite/internal/GridKernalContext.java | 5 +++
.../ignite/internal/GridKernalContextImpl.java | 11 +++++++
.../apache/ignite/internal/IgniteKernal.java | 4 ++-
.../org/apache/ignite/internal/IgnitionEx.java | 12 +++++++-
.../managers/communication/GridIoManager.java | 15 ++++++++-
.../managers/communication/GridIoPolicy.java | 5 ++-
.../junits/GridTestKernalContext.java | 10 ++++++
8 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..73d0c80 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -156,12 +156,18 @@ public class IgniteConfiguration {
/** Default max queue capacity of system thread pool. */
public static final int DFLT_SYSTEM_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
+ /** Default max queue capacity of DR (Data Replication) thread pool. */
+ public static final int DFLT_DR_THREADPOOL_QUEUE_CAP = 40;
+
/** Default size of peer class loading thread pool. */
public static final int DFLT_P2P_THREAD_CNT = 2;
/** Default size of management thread pool. */
public static final int DFLT_MGMT_THREAD_CNT = 4;
+ /** Default size of DR (Data Replication) thread pool. */
+ public static final int DFLT_DR_THREAD_CNT = 4;
+
/** Default segmentation policy. */
public static final SegmentationPolicy DFLT_SEG_PLC = STOP;
@@ -210,6 +216,9 @@ public class IgniteConfiguration {
/** IGFS pool size. */
private int igfsPoolSize = AVAILABLE_PROC_CNT;
+ /** DR pool size. */
+ private int drPoolSize = DFLT_DR_THREAD_CNT;
+
/** Utility cache pool size. */
private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
@@ -682,6 +691,17 @@ public class IgniteConfiguration {
}
/**
+ * Size of thread pool that is in charge of processing DR (Data Replication) messages.
+ * <p>
+ * If not provided, executor service will have size {@link #DFLT_DR_THREAD_CNT}.
+ *
+ * @return Thread pool size to be used for DR message processing.
+ */
+ public int getDrThreadPoolSize() {
+ return drPoolSize;
+ }
+
+ /**
* Default size of thread pool that is in charge of processing utility cache messages.
* <p>
* If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
@@ -791,6 +811,18 @@ public class IgniteConfiguration {
}
/**
+ * Set thread pool size that will be used to send and receive DR (Data Replication) messages.
+ *
+ * @param poolSize Set thread pool size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setDrThreadPoolSize(int poolSize) {
+ drPoolSize = poolSize;
+
+ return this;
+ }
+
+ /**
* Sets default thread pool size that will be used to process utility cache messages.
*
* @param poolSize Default executor service size to use for utility cache messages.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index d6542f3..c354609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -465,6 +465,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public ExecutorService getExecutorService();
/**
+ * @return Thread pool implementation to be used in grid data replication.
+ */
+ public ExecutorService getDrExecutorService();
+
+ /**
* Executor service that is in charge of processing internal system messages.
*
* @return Thread pool implementation to be used in grid for internal system messages.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 65107a7..ecc02a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -254,6 +254,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ protected ExecutorService drExecSvc;
+
+ /** */
+ @GridToStringExclude
protected ExecutorService sysExecSvc;
/** */
@@ -341,6 +345,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
ExecutorService restExecSvc,
+ ExecutorService drExecSvc,
List<PluginProvider> plugins) throws IgniteCheckedException {
assert grid != null;
assert cfg != null;
@@ -357,6 +362,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.mgmtExecSvc = mgmtExecSvc;
this.igfsExecSvc = igfsExecSvc;
this.restExecSvc = restExecSvc;
+ this.drExecSvc = drExecSvc;
marshCtx = new MarshallerContextImpl(plugins);
@@ -838,6 +844,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public ExecutorService getDrExecutorService() {
+ return drExecSvc;
+ }
+
+ /** {@inheritDoc} */
@Override public ExecutorService getSystemExecutorService() {
return sysExecSvc;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e19d3d3..2c6596b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -77,7 +77,6 @@ import javax.management.*;
import java.io.*;
import java.lang.management.*;
import java.lang.reflect.*;
-import java.security.*;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
@@ -528,6 +527,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param restExecSvc Reset executor service.
+ * @param drExecSvc DR (Data Replication) executor service.
* @param errHnd Error handler to use for notification about startup problems.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@@ -541,6 +541,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
ExecutorService restExecSvc,
+ ExecutorService drExecSvc,
GridAbsClosure errHnd)
throws IgniteCheckedException
{
@@ -646,6 +647,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
mgmtExecSvc,
igfsExecSvc,
restExecSvc,
+ drExecSvc,
plugins);
cfg.getMarshaller().setContext(ctx.marshallerContext());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5cbe377..b502f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1296,6 +1296,9 @@ public class IgnitionEx {
/** Marshaller cache executor service. */
private ExecutorService marshCacheExecSvc;
+ /** DR cache executor service. */
+ private ExecutorService drExecSvc;
+
/** Grid state. */
private volatile IgniteState state = STOPPED;
@@ -1524,6 +1527,13 @@ public class IgnitionEx {
myCfg.getMarshallerCacheKeepAliveTime(),
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+ drExecSvc = new IgniteThreadPoolExecutor(
+ "data-replication-" + cfg.getGridName(),
+ cfg.getDrThreadPoolSize(),
+ cfg.getDrThreadPoolSize(),
+ DFLT_PUBLIC_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_DR_THREADPOOL_QUEUE_CAP));
+
// Register Ignite MBean for current grid instance.
registerFactoryMbean(myCfg.getMBeanServer());
@@ -1536,7 +1546,7 @@ public class IgnitionEx {
grid = grid0;
grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
- igfsExecSvc, restExecSvc,
+ igfsExecSvc, restExecSvc, drExecSvc,
new CA() {
@Override public void apply() {
startLatch.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4382731..b522a1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -83,6 +83,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Affinity assignment executor service. */
private ExecutorService affPool;
+ /** DR (Data Replication) executor service. */
+ private ExecutorService drPool;
+
/** Utility cache pool. */
private ExecutorService utilityCachePool;
@@ -185,6 +188,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
p2pPool = ctx.getPeerClassLoadingExecutorService();
sysPool = ctx.getSystemExecutorService();
mgmtPool = ctx.getManagementExecutorService();
+ drPool = ctx.getDrExecutorService();
utilityCachePool = ctx.utilityCachePool();
marshCachePool = ctx.marshallerCachePool();
affPool = new IgniteThreadPoolExecutor(
@@ -531,7 +535,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case MANAGEMENT_POOL:
case AFFINITY_POOL:
case UTILITY_CACHE_POOL:
- case MARSH_CACHE_POOL: {
+ case MARSH_CACHE_POOL:
+ case DR_POOL:
+ {
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
else
@@ -539,6 +545,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
break;
}
+
+ default:
+ throw new IllegalStateException("Failed to process message dues to " +
+ "unknown policy, [policy=" + msg.policy() + ']');
}
}
catch (IgniteCheckedException e) {
@@ -568,6 +578,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case AFFINITY_POOL:
return affPool;
+ case DR_POOL:
+ return drPool;
+
case UTILITY_CACHE_POOL:
assert utilityCachePool != null : "Utility cache pool is not configured.";
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 6e45043..ad74baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -43,7 +43,10 @@ public enum GridIoPolicy {
UTILITY_CACHE_POOL,
/** Marshaller cache execution pool. */
- MARSH_CACHE_POOL;
+ MARSH_CACHE_POOL,
+
+ /** DR internal messaging pool. */
+ DR_POOL;
/** Enum values. */
private static final GridIoPolicy[] VALS = values();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/999b5f03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 24502da..4485d5e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -46,6 +46,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
U.allPluginProviders());
GridTestUtils.setFieldValue(grid(), "cfg", config());
@@ -102,4 +103,13 @@ public class GridTestKernalContext extends GridKernalContextImpl {
public void setExecutorService(ExecutorService execSvc){
this.execSvc = execSvc;
}
+
+ /**
+ * Sets executor service.
+ *
+ * @param execSvc Executor service
+ */
+ public void setDrExecutorService(ExecutorService execSvc){
+ this.drExecSvc = execSvc;
+ }
}