You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 16:12:26 UTC
[04/50] incubator-ignite git commit: #ignite-239: small refactoring.
#ignite-239: small refactoring.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c1b4ad70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c1b4ad70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c1b4ad70
Branch: refs/heads/ignite-334
Commit: c1b4ad70cf1d1d954981c1056e4e66b021c465c7
Parents: 7e2b2e2
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Feb 20 18:29:11 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Feb 20 18:29:11 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 4 +-
.../org/apache/ignite/internal/IgnitionEx.java | 352 ++++++++++---------
2 files changed, 179 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e554aea..d44b057 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -384,7 +384,6 @@ public class IgniteConfiguration {
*
* @param cfg Grid configuration to copy from.
*/
- @SuppressWarnings("deprecation")
public IgniteConfiguration(IgniteConfiguration cfg) {
assert cfg != null;
@@ -455,7 +454,8 @@ public class IgniteConfiguration {
sysPoolSize = cfg.getSystemThreadPoolSize();
timeSrvPortBase = cfg.getTimeServerPortBase();
timeSrvPortRange = cfg.getTimeServerPortRange();
- txCfg = cfg.getTransactionConfiguration();
+ txCfg = cfg.getTransactionConfiguration() != null ?
+ new TransactionConfiguration(cfg.getTransactionConfiguration()) : null;
userAttrs = cfg.getUserAttributes();
waitForSegOnStart = cfg.isWaitForSegmentOnStart();
warmupClos = cfg.getWarmupClosure();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 34f29a7..9c196f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1333,6 +1333,8 @@ public class IgnitionEx {
if (nodeId == null)
nodeId = UUID.randomUUID();
+ myCfg.setNodeId(nodeId);
+
IgniteLogger cfgLog = initLogger(cfg.getGridLogger(), nodeId);
assert cfgLog != null;
@@ -1342,6 +1344,8 @@ public class IgnitionEx {
// Initialize factory's log.
log = cfgLog.getLogger(G.class);
+ myCfg.setGridLogger(cfgLog);
+
// Check Ignite home folder (after log is available).
if (ggHome != null) {
File ggHomeFile = new File(ggHome);
@@ -1352,13 +1356,6 @@ public class IgnitionEx {
myCfg.setIgniteHome(ggHome);
- myCfg.setTransactionConfiguration(new TransactionConfiguration(cfg.getTransactionConfiguration()));
-
- ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
-
- if (clientCfg != null)
- clientCfg = new ConnectorConfiguration(clientCfg);
-
// Local host.
String locHost = IgniteSystemProperties.getString(IGNITE_LOCAL_HOST);
@@ -1391,69 +1388,10 @@ public class IgnitionEx {
}
}
- Map<String, ?> attrs = cfg.getUserAttributes();
-
- if (attrs == null)
- attrs = Collections.emptyMap();
-
- MBeanServer mbSrv = cfg.getMBeanServer();
-
- Marshaller marsh = cfg.getMarshaller();
-
- String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude();
-
-
- execSvc = new IgniteThreadPoolExecutor(
- "pub-" + cfg.getGridName(),
- cfg.getPublicThreadPoolSize(),
- cfg.getPublicThreadPoolSize(),
- DFLT_PUBLIC_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
- // Note that since we use 'LinkedBlockingQueue', number of
- // maximum threads has no effect.
- sysExecSvc = new IgniteThreadPoolExecutor(
- "sys-" + cfg.getGridName(),
- cfg.getSystemThreadPoolSize(),
- cfg.getSystemThreadPoolSize(),
- DFLT_SYSTEM_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
-
- // Note that since we use 'LinkedBlockingQueue', number of
- // maximum threads has no effect.
- // Note, that we do not pre-start threads here as management pool may
- // not be needed.
- mgmtExecSvc = new IgniteThreadPoolExecutor(
- "mgmt-" + cfg.getGridName(),
- cfg.getManagementThreadPoolSize(),
- cfg.getManagementThreadPoolSize(),
- 0,
- new LinkedBlockingQueue<Runnable>());
-
- // Note that since we use 'LinkedBlockingQueue', number of
- // maximum threads has no effect.
- // Note, that we do not pre-start threads here as class loading pool may
- // not be needed.
- p2pExecSvc = new IgniteThreadPoolExecutor(
- "p2p-" + cfg.getGridName(),
- cfg.getPeerClassLoadingThreadPoolSize(),
- cfg.getPeerClassLoadingThreadPoolSize(),
- 0,
- new LinkedBlockingQueue<Runnable>());
+ ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
- // Note that we do not pre-start threads here as igfs pool may not be needed.
- igfsExecSvc = new IgniteThreadPoolExecutor(
- "igfs-" + cfg.getGridName(),
- cfg.getIgfsThreadPoolSize(),
- cfg.getIgfsThreadPoolSize(),
- 0,
- new LinkedBlockingQueue<Runnable>());
+ if (clientCfg != null)
+ clientCfg = new ConnectorConfiguration(clientCfg);
if (clientCfg != null) {
restExecSvc = new IgniteThreadPoolExecutor(
@@ -1465,12 +1403,10 @@ public class IgnitionEx {
);
}
- utilityCacheExecSvc = new IgniteThreadPoolExecutor(
- "utility-" + cfg.getGridName(),
- DFLT_SYSTEM_CORE_THREAD_CNT,
- DFLT_SYSTEM_MAX_THREAD_CNT,
- DFLT_SYSTEM_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+ // REST configuration.
+ myCfg.setConnectorConfiguration(clientCfg);
+
+ Marshaller marsh = cfg.getMarshaller();
if (marsh == null) {
if (!U.isHotSpot()) {
@@ -1500,12 +1436,18 @@ public class IgnitionEx {
"Using GridOptimizedMarshaller on untested JVM.");
}
+ myCfg.setMarshaller(marsh);
+
+ Map<String, ?> attrs = cfg.getUserAttributes();
+
+ if (attrs == null)
+ attrs = Collections.emptyMap();
+
myCfg.setUserAttributes(attrs);
+
+ MBeanServer mbSrv = cfg.getMBeanServer();
+
myCfg.setMBeanServer(mbSrv == null ? ManagementFactory.getPlatformMBeanServer() : mbSrv);
- myCfg.setGridLogger(cfgLog);
- myCfg.setMarshaller(marsh);
- myCfg.setMarshalLocalJobs(cfg.isMarshalLocalJobs());
- myCfg.setNodeId(nodeId);
IgfsConfiguration[] igfsCfgs = cfg.getIgfsConfiguration();
@@ -1529,14 +1471,13 @@ public class IgnitionEx {
myCfg.setStreamerConfiguration(clone);
}
+ String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude();
+
if (p2pExclude == null)
p2pExclude = EMPTY_STR_ARR;
myCfg.setPeerClassLoadingLocalClassPathExclude(p2pExclude);
- // REST configuration.
- myCfg.setConnectorConfiguration(clientCfg);
-
// Validate segmentation configuration.
GridSegmentationPolicy segPlc = cfg.getSegmentationPolicy();
@@ -1547,85 +1488,7 @@ public class IgnitionEx {
"on start?) [segPlc=" + segPlc + ", wait=false]");
}
-
-
- /*
- * Initialize default SPI implementations.
- */
- CommunicationSpi commSpi = cfg.getCommunicationSpi();
- DiscoverySpi discoSpi = cfg.getDiscoverySpi();
- EventStorageSpi evtSpi = cfg.getEventStorageSpi();
- CollisionSpi colSpi = cfg.getCollisionSpi();
- DeploymentSpi deploySpi = cfg.getDeploymentSpi();
- CheckpointSpi[] cpSpi = cfg.getCheckpointSpi();
- FailoverSpi[] failSpi = cfg.getFailoverSpi();
- LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi();
- SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi();
- IndexingSpi indexingSpi = cfg.getIndexingSpi();
-
-
- if (commSpi == null)
- commSpi = new TcpCommunicationSpi();
-
- if (discoSpi == null)
- discoSpi = new TcpDiscoverySpi();
-
- if (discoSpi instanceof TcpDiscoverySpi) {
- TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi;
-
- if (tcpDisco.getIpFinder() == null)
- tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());
- }
-
- if (evtSpi == null)
- evtSpi = new MemoryEventStorageSpi();
-
- if (colSpi == null)
- colSpi = new NoopCollisionSpi();
-
- if (deploySpi == null)
- deploySpi = new LocalDeploymentSpi();
-
- if (cpSpi == null)
- cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()};
-
- if (failSpi == null)
- failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()};
-
- if (loadBalancingSpi == null)
- loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()};
-
- if (swapspaceSpi == null) {
- boolean needSwap = false;
-
- CacheConfiguration[] caches = cfg.getCacheConfiguration();
-
- if (caches != null) {
- for (CacheConfiguration c : caches) {
- if (c.isSwapEnabled()) {
- needSwap = true;
-
- break;
- }
- }
- }
-
- swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi();
- }
-
- if (indexingSpi == null)
- indexingSpi = new NoopIndexingSpi();
-
- myCfg.setCommunicationSpi(commSpi);
- myCfg.setDiscoverySpi(discoSpi);
- myCfg.setCheckpointSpi(cpSpi);
- myCfg.setEventStorageSpi(evtSpi);
- myCfg.setDeploymentSpi(deploySpi);
- myCfg.setFailoverSpi(failSpi);
- myCfg.setCollisionSpi(colSpi);
- myCfg.setLoadBalancingSpi(loadBalancingSpi);
- myCfg.setSwapSpaceSpi(swapspaceSpi);
- myCfg.setIndexingSpi(indexingSpi);
+ copySpis(cfg, myCfg, startCtx.single());
CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
@@ -1633,12 +1496,12 @@ public class IgnitionEx {
final boolean hasAtomics = cfg.getAtomicConfiguration() != null;
- final boolean clientDisco = discoSpi instanceof TcpClientDiscoverySpi;
+ final boolean clientDisco = myCfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi;
CacheConfiguration[] copies;
if (cacheCfgs != null && cacheCfgs.length > 0) {
- if (!U.discoOrdered(discoSpi) && !U.relaxDiscoveryOrdered())
+ if (!U.discoOrdered(myCfg.getDiscoverySpi()) && !U.relaxDiscoveryOrdered())
throw new IgniteCheckedException("Discovery SPI implementation does not support node ordering and " +
"cannot be used with cache (use SPI with @GridDiscoverySpiOrderSupport annotation, " +
"like GridTcpDiscoverySpi)");
@@ -1716,18 +1579,64 @@ public class IgnitionEx {
// No-op.
}
- // Ensure that SPIs support multiple grid instances, if required.
- if (!startCtx.single()) {
- ensureMultiInstanceSupport(deploySpi);
- ensureMultiInstanceSupport(commSpi);
- ensureMultiInstanceSupport(discoSpi);
- ensureMultiInstanceSupport(cpSpi);
- ensureMultiInstanceSupport(evtSpi);
- ensureMultiInstanceSupport(colSpi);
- ensureMultiInstanceSupport(failSpi);
- ensureMultiInstanceSupport(loadBalancingSpi);
- ensureMultiInstanceSupport(swapspaceSpi);
- }
+ execSvc = new IgniteThreadPoolExecutor(
+ "pub-" + cfg.getGridName(),
+ cfg.getPublicThreadPoolSize(),
+ cfg.getPublicThreadPoolSize(),
+ DFLT_PUBLIC_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+
+ // Note that since we use 'LinkedBlockingQueue', number of
+ // maximum threads has no effect.
+ sysExecSvc = new IgniteThreadPoolExecutor(
+ "sys-" + cfg.getGridName(),
+ cfg.getSystemThreadPoolSize(),
+ cfg.getSystemThreadPoolSize(),
+ DFLT_SYSTEM_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+
+ // Note that since we use 'LinkedBlockingQueue', number of
+ // maximum threads has no effect.
+ // Note, that we do not pre-start threads here as management pool may
+ // not be needed.
+ mgmtExecSvc = new IgniteThreadPoolExecutor(
+ "mgmt-" + cfg.getGridName(),
+ cfg.getManagementThreadPoolSize(),
+ cfg.getManagementThreadPoolSize(),
+ 0,
+ new LinkedBlockingQueue<Runnable>());
+
+ // Note that since we use 'LinkedBlockingQueue', number of
+ // maximum threads has no effect.
+ // Note, that we do not pre-start threads here as class loading pool may
+ // not be needed.
+ p2pExecSvc = new IgniteThreadPoolExecutor(
+ "p2p-" + cfg.getGridName(),
+ cfg.getPeerClassLoadingThreadPoolSize(),
+ cfg.getPeerClassLoadingThreadPoolSize(),
+ 0,
+ new LinkedBlockingQueue<Runnable>());
+
+ // Note that we do not pre-start threads here as igfs pool may not be needed.
+ igfsExecSvc = new IgniteThreadPoolExecutor(
+ "igfs-" + cfg.getGridName(),
+ cfg.getIgfsThreadPoolSize(),
+ cfg.getIgfsThreadPoolSize(),
+ 0,
+ new LinkedBlockingQueue<Runnable>());
+
+ utilityCacheExecSvc = new IgniteThreadPoolExecutor(
+ "utility-" + cfg.getGridName(),
+ DFLT_SYSTEM_CORE_THREAD_CNT,
+ DFLT_SYSTEM_MAX_THREAD_CNT,
+ DFLT_SYSTEM_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
// Register Ignite MBean for current grid instance.
registerFactoryMbean(myCfg.getMBeanServer());
@@ -1800,6 +1709,99 @@ public class IgnitionEx {
}
}
+ private void copySpis(IgniteConfiguration cfg, IgniteConfiguration myCfg, boolean singleGrid)
+ throws IgniteCheckedException {
+ /*
+ * Initialize default SPI implementations.
+ */
+ CommunicationSpi commSpi = cfg.getCommunicationSpi();
+ DiscoverySpi discoSpi = cfg.getDiscoverySpi();
+ EventStorageSpi evtSpi = cfg.getEventStorageSpi();
+ CollisionSpi colSpi = cfg.getCollisionSpi();
+ DeploymentSpi deploySpi = cfg.getDeploymentSpi();
+ CheckpointSpi[] cpSpi = cfg.getCheckpointSpi();
+ FailoverSpi[] failSpi = cfg.getFailoverSpi();
+ LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi();
+ SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi();
+ IndexingSpi indexingSpi = cfg.getIndexingSpi();
+
+ if (commSpi == null)
+ commSpi = new TcpCommunicationSpi();
+
+ if (discoSpi == null)
+ discoSpi = new TcpDiscoverySpi();
+
+ if (discoSpi instanceof TcpDiscoverySpi) {
+ TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi;
+
+ if (tcpDisco.getIpFinder() == null)
+ tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());
+ }
+
+ if (evtSpi == null)
+ evtSpi = new MemoryEventStorageSpi();
+
+ if (colSpi == null)
+ colSpi = new NoopCollisionSpi();
+
+ if (deploySpi == null)
+ deploySpi = new LocalDeploymentSpi();
+
+ if (cpSpi == null)
+ cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()};
+
+ if (failSpi == null)
+ failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()};
+
+ if (loadBalancingSpi == null)
+ loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()};
+
+ if (swapspaceSpi == null) {
+ boolean needSwap = false;
+
+ CacheConfiguration[] caches = cfg.getCacheConfiguration();
+
+ if (caches != null) {
+ for (CacheConfiguration c : caches) {
+ if (c.isSwapEnabled()) {
+ needSwap = true;
+
+ break;
+ }
+ }
+ }
+
+ swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi();
+ }
+
+ if (indexingSpi == null)
+ indexingSpi = new NoopIndexingSpi();
+
+ myCfg.setCommunicationSpi(commSpi);
+ myCfg.setDiscoverySpi(discoSpi);
+ myCfg.setCheckpointSpi(cpSpi);
+ myCfg.setEventStorageSpi(evtSpi);
+ myCfg.setDeploymentSpi(deploySpi);
+ myCfg.setFailoverSpi(failSpi);
+ myCfg.setCollisionSpi(colSpi);
+ myCfg.setLoadBalancingSpi(loadBalancingSpi);
+ myCfg.setSwapSpaceSpi(swapspaceSpi);
+ myCfg.setIndexingSpi(indexingSpi);
+
+ // Ensure that SPIs support multiple grid instances, if required.
+ if (!singleGrid) {
+ ensureMultiInstanceSupport(deploySpi);
+ ensureMultiInstanceSupport(commSpi);
+ ensureMultiInstanceSupport(discoSpi);
+ ensureMultiInstanceSupport(cpSpi);
+ ensureMultiInstanceSupport(evtSpi);
+ ensureMultiInstanceSupport(colSpi);
+ ensureMultiInstanceSupport(failSpi);
+ ensureMultiInstanceSupport(loadBalancingSpi);
+ ensureMultiInstanceSupport(swapspaceSpi);
+ }
+ }
+
/**
* @param cfgLog Configured logger.
* @param nodeId Local node ID.