You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/02 17:29:52 UTC

[06/50] incubator-ignite git commit: #ignite-239: small refactoring.

#ignite-239: small refactoring.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c1b4ad70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c1b4ad70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c1b4ad70

Branch: refs/heads/ignite-368
Commit: c1b4ad70cf1d1d954981c1056e4e66b021c465c7
Parents: 7e2b2e2
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Feb 20 18:29:11 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Feb 20 18:29:11 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |   4 +-
 .../org/apache/ignite/internal/IgnitionEx.java  | 352 ++++++++++---------
 2 files changed, 179 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e554aea..d44b057 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -384,7 +384,6 @@ public class IgniteConfiguration {
      *
      * @param cfg Grid configuration to copy from.
      */
-    @SuppressWarnings("deprecation")
     public IgniteConfiguration(IgniteConfiguration cfg) {
         assert cfg != null;
 
@@ -455,7 +454,8 @@ public class IgniteConfiguration {
         sysPoolSize = cfg.getSystemThreadPoolSize();
         timeSrvPortBase = cfg.getTimeServerPortBase();
         timeSrvPortRange = cfg.getTimeServerPortRange();
-        txCfg = cfg.getTransactionConfiguration();
+        txCfg = cfg.getTransactionConfiguration() != null ?
+            new TransactionConfiguration(cfg.getTransactionConfiguration()) : null;
         userAttrs = cfg.getUserAttributes();
         waitForSegOnStart = cfg.isWaitForSegmentOnStart();
         warmupClos = cfg.getWarmupClosure();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b4ad70/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 34f29a7..9c196f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1333,6 +1333,8 @@ public class IgnitionEx {
             if (nodeId == null)
                 nodeId = UUID.randomUUID();
 
+            myCfg.setNodeId(nodeId);
+
             IgniteLogger cfgLog = initLogger(cfg.getGridLogger(), nodeId);
 
             assert cfgLog != null;
@@ -1342,6 +1344,8 @@ public class IgnitionEx {
             // Initialize factory's log.
             log = cfgLog.getLogger(G.class);
 
+            myCfg.setGridLogger(cfgLog);
+
             // Check Ignite home folder (after log is available).
             if (ggHome != null) {
                 File ggHomeFile = new File(ggHome);
@@ -1352,13 +1356,6 @@ public class IgnitionEx {
 
             myCfg.setIgniteHome(ggHome);
 
-            myCfg.setTransactionConfiguration(new TransactionConfiguration(cfg.getTransactionConfiguration()));
-
-            ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
-
-            if (clientCfg != null)
-                clientCfg = new ConnectorConfiguration(clientCfg);
-
             // Local host.
             String locHost = IgniteSystemProperties.getString(IGNITE_LOCAL_HOST);
 
@@ -1391,69 +1388,10 @@ public class IgnitionEx {
                 }
             }
 
-            Map<String, ?> attrs = cfg.getUserAttributes();
-
-            if (attrs == null)
-                attrs = Collections.emptyMap();
-
-            MBeanServer mbSrv = cfg.getMBeanServer();
-
-            Marshaller marsh = cfg.getMarshaller();
-
-            String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude();
-
-
-            execSvc = new IgniteThreadPoolExecutor(
-                "pub-" + cfg.getGridName(),
-                cfg.getPublicThreadPoolSize(),
-                cfg.getPublicThreadPoolSize(),
-                DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
-            sysExecSvc = new IgniteThreadPoolExecutor(
-                "sys-" + cfg.getGridName(),
-                cfg.getSystemThreadPoolSize(),
-                cfg.getSystemThreadPoolSize(),
-                DFLT_SYSTEM_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
-
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
-
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
-            // Note, that we do not pre-start threads here as management pool may
-            // not be needed.
-            mgmtExecSvc = new IgniteThreadPoolExecutor(
-                "mgmt-" + cfg.getGridName(),
-                cfg.getManagementThreadPoolSize(),
-                cfg.getManagementThreadPoolSize(),
-                0,
-                new LinkedBlockingQueue<Runnable>());
-
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
-            // Note, that we do not pre-start threads here as class loading pool may
-            // not be needed.
-            p2pExecSvc = new IgniteThreadPoolExecutor(
-                "p2p-" + cfg.getGridName(),
-                cfg.getPeerClassLoadingThreadPoolSize(),
-                cfg.getPeerClassLoadingThreadPoolSize(),
-                0,
-                new LinkedBlockingQueue<Runnable>());
+            ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
 
-            // Note that we do not pre-start threads here as igfs pool may not be needed.
-            igfsExecSvc = new IgniteThreadPoolExecutor(
-                "igfs-" + cfg.getGridName(),
-                cfg.getIgfsThreadPoolSize(),
-                cfg.getIgfsThreadPoolSize(),
-                0,
-                new LinkedBlockingQueue<Runnable>());
+            if (clientCfg != null)
+                clientCfg = new ConnectorConfiguration(clientCfg);
 
             if (clientCfg != null) {
                 restExecSvc = new IgniteThreadPoolExecutor(
@@ -1465,12 +1403,10 @@ public class IgnitionEx {
                 );
             }
 
-            utilityCacheExecSvc = new IgniteThreadPoolExecutor(
-                "utility-" + cfg.getGridName(),
-                DFLT_SYSTEM_CORE_THREAD_CNT,
-                DFLT_SYSTEM_MAX_THREAD_CNT,
-                DFLT_SYSTEM_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+            // REST configuration.
+            myCfg.setConnectorConfiguration(clientCfg);
+
+            Marshaller marsh = cfg.getMarshaller();
 
             if (marsh == null) {
                 if (!U.isHotSpot()) {
@@ -1500,12 +1436,18 @@ public class IgnitionEx {
                     "Using GridOptimizedMarshaller on untested JVM.");
             }
 
+            myCfg.setMarshaller(marsh);
+
+            Map<String, ?> attrs = cfg.getUserAttributes();
+
+            if (attrs == null)
+                attrs = Collections.emptyMap();
+
             myCfg.setUserAttributes(attrs);
+
+            MBeanServer mbSrv = cfg.getMBeanServer();
+
             myCfg.setMBeanServer(mbSrv == null ? ManagementFactory.getPlatformMBeanServer() : mbSrv);
-            myCfg.setGridLogger(cfgLog);
-            myCfg.setMarshaller(marsh);
-            myCfg.setMarshalLocalJobs(cfg.isMarshalLocalJobs());
-            myCfg.setNodeId(nodeId);
 
             IgfsConfiguration[] igfsCfgs = cfg.getIgfsConfiguration();
 
@@ -1529,14 +1471,13 @@ public class IgnitionEx {
                 myCfg.setStreamerConfiguration(clone);
             }
 
+            String[] p2pExclude = cfg.getPeerClassLoadingLocalClassPathExclude();
+
             if (p2pExclude == null)
                 p2pExclude = EMPTY_STR_ARR;
 
             myCfg.setPeerClassLoadingLocalClassPathExclude(p2pExclude);
 
-            // REST configuration.
-            myCfg.setConnectorConfiguration(clientCfg);
-
             // Validate segmentation configuration.
             GridSegmentationPolicy segPlc = cfg.getSegmentationPolicy();
 
@@ -1547,85 +1488,7 @@ public class IgnitionEx {
                     "on start?) [segPlc=" + segPlc + ", wait=false]");
             }
 
-
-
-            /*
-             * Initialize default SPI implementations.
-             */
-            CommunicationSpi commSpi = cfg.getCommunicationSpi();
-            DiscoverySpi discoSpi = cfg.getDiscoverySpi();
-            EventStorageSpi evtSpi = cfg.getEventStorageSpi();
-            CollisionSpi colSpi = cfg.getCollisionSpi();
-            DeploymentSpi deploySpi = cfg.getDeploymentSpi();
-            CheckpointSpi[] cpSpi = cfg.getCheckpointSpi();
-            FailoverSpi[] failSpi = cfg.getFailoverSpi();
-            LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi();
-            SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi();
-            IndexingSpi indexingSpi = cfg.getIndexingSpi();
-
-
-            if (commSpi == null)
-                commSpi = new TcpCommunicationSpi();
-
-            if (discoSpi == null)
-                discoSpi = new TcpDiscoverySpi();
-
-            if (discoSpi instanceof TcpDiscoverySpi) {
-                TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi;
-
-                if (tcpDisco.getIpFinder() == null)
-                    tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());
-            }
-
-            if (evtSpi == null)
-                evtSpi = new MemoryEventStorageSpi();
-
-            if (colSpi == null)
-                colSpi = new NoopCollisionSpi();
-
-            if (deploySpi == null)
-                deploySpi = new LocalDeploymentSpi();
-
-            if (cpSpi == null)
-                cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()};
-
-            if (failSpi == null)
-                failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()};
-
-            if (loadBalancingSpi == null)
-                loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()};
-
-            if (swapspaceSpi == null) {
-                boolean needSwap = false;
-
-                CacheConfiguration[] caches = cfg.getCacheConfiguration();
-
-                if (caches != null) {
-                    for (CacheConfiguration c : caches) {
-                        if (c.isSwapEnabled()) {
-                            needSwap = true;
-
-                            break;
-                        }
-                    }
-                }
-
-                swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi();
-            }
-
-            if (indexingSpi == null)
-                indexingSpi = new NoopIndexingSpi();
-
-            myCfg.setCommunicationSpi(commSpi);
-            myCfg.setDiscoverySpi(discoSpi);
-            myCfg.setCheckpointSpi(cpSpi);
-            myCfg.setEventStorageSpi(evtSpi);
-            myCfg.setDeploymentSpi(deploySpi);
-            myCfg.setFailoverSpi(failSpi);
-            myCfg.setCollisionSpi(colSpi);
-            myCfg.setLoadBalancingSpi(loadBalancingSpi);
-            myCfg.setSwapSpaceSpi(swapspaceSpi);
-            myCfg.setIndexingSpi(indexingSpi);
+            copySpis(cfg, myCfg, startCtx.single());
 
             CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
 
@@ -1633,12 +1496,12 @@ public class IgnitionEx {
 
             final boolean hasAtomics = cfg.getAtomicConfiguration() != null;
 
-            final boolean clientDisco = discoSpi instanceof TcpClientDiscoverySpi;
+            final boolean clientDisco = myCfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi;
 
             CacheConfiguration[] copies;
 
             if (cacheCfgs != null && cacheCfgs.length > 0) {
-                if (!U.discoOrdered(discoSpi) && !U.relaxDiscoveryOrdered())
+                if (!U.discoOrdered(myCfg.getDiscoverySpi()) && !U.relaxDiscoveryOrdered())
                     throw new IgniteCheckedException("Discovery SPI implementation does not support node ordering and " +
                         "cannot be used with cache (use SPI with @GridDiscoverySpiOrderSupport annotation, " +
                         "like GridTcpDiscoverySpi)");
@@ -1716,18 +1579,64 @@ public class IgnitionEx {
                 // No-op.
             }
 
-            // Ensure that SPIs support multiple grid instances, if required.
-            if (!startCtx.single()) {
-                ensureMultiInstanceSupport(deploySpi);
-                ensureMultiInstanceSupport(commSpi);
-                ensureMultiInstanceSupport(discoSpi);
-                ensureMultiInstanceSupport(cpSpi);
-                ensureMultiInstanceSupport(evtSpi);
-                ensureMultiInstanceSupport(colSpi);
-                ensureMultiInstanceSupport(failSpi);
-                ensureMultiInstanceSupport(loadBalancingSpi);
-                ensureMultiInstanceSupport(swapspaceSpi);
-            }
+            execSvc = new IgniteThreadPoolExecutor(
+                "pub-" + cfg.getGridName(),
+                cfg.getPublicThreadPoolSize(),
+                cfg.getPublicThreadPoolSize(),
+                DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+            // Pre-start all threads as they are guaranteed to be needed.
+            ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+
+            // Note that since we use 'LinkedBlockingQueue', number of
+            // maximum threads has no effect.
+            sysExecSvc = new IgniteThreadPoolExecutor(
+                "sys-" + cfg.getGridName(),
+                cfg.getSystemThreadPoolSize(),
+                cfg.getSystemThreadPoolSize(),
+                DFLT_SYSTEM_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+
+            // Pre-start all threads as they are guaranteed to be needed.
+            ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+
+            // Note that since we use 'LinkedBlockingQueue', number of
+            // maximum threads has no effect.
+            // Note, that we do not pre-start threads here as management pool may
+            // not be needed.
+            mgmtExecSvc = new IgniteThreadPoolExecutor(
+                "mgmt-" + cfg.getGridName(),
+                cfg.getManagementThreadPoolSize(),
+                cfg.getManagementThreadPoolSize(),
+                0,
+                new LinkedBlockingQueue<Runnable>());
+
+            // Note that since we use 'LinkedBlockingQueue', number of
+            // maximum threads has no effect.
+            // Note, that we do not pre-start threads here as class loading pool may
+            // not be needed.
+            p2pExecSvc = new IgniteThreadPoolExecutor(
+                "p2p-" + cfg.getGridName(),
+                cfg.getPeerClassLoadingThreadPoolSize(),
+                cfg.getPeerClassLoadingThreadPoolSize(),
+                0,
+                new LinkedBlockingQueue<Runnable>());
+
+            // Note that we do not pre-start threads here as igfs pool may not be needed.
+            igfsExecSvc = new IgniteThreadPoolExecutor(
+                "igfs-" + cfg.getGridName(),
+                cfg.getIgfsThreadPoolSize(),
+                cfg.getIgfsThreadPoolSize(),
+                0,
+                new LinkedBlockingQueue<Runnable>());
+
+            utilityCacheExecSvc = new IgniteThreadPoolExecutor(
+                "utility-" + cfg.getGridName(),
+                DFLT_SYSTEM_CORE_THREAD_CNT,
+                DFLT_SYSTEM_MAX_THREAD_CNT,
+                DFLT_SYSTEM_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());
@@ -1800,6 +1709,99 @@ public class IgnitionEx {
             }
         }
 
+        private void copySpis(IgniteConfiguration cfg, IgniteConfiguration myCfg, boolean singleGrid)
+            throws IgniteCheckedException {
+            /*
+             * Initialize default SPI implementations.
+             */
+            CommunicationSpi commSpi = cfg.getCommunicationSpi();
+            DiscoverySpi discoSpi = cfg.getDiscoverySpi();
+            EventStorageSpi evtSpi = cfg.getEventStorageSpi();
+            CollisionSpi colSpi = cfg.getCollisionSpi();
+            DeploymentSpi deploySpi = cfg.getDeploymentSpi();
+            CheckpointSpi[] cpSpi = cfg.getCheckpointSpi();
+            FailoverSpi[] failSpi = cfg.getFailoverSpi();
+            LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi();
+            SwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi();
+            IndexingSpi indexingSpi = cfg.getIndexingSpi();
+
+            if (commSpi == null)
+                commSpi = new TcpCommunicationSpi();
+
+            if (discoSpi == null)
+                discoSpi = new TcpDiscoverySpi();
+
+            if (discoSpi instanceof TcpDiscoverySpi) {
+                TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)discoSpi;
+
+                if (tcpDisco.getIpFinder() == null)
+                    tcpDisco.setIpFinder(new TcpDiscoveryMulticastIpFinder());
+            }
+
+            if (evtSpi == null)
+                evtSpi = new MemoryEventStorageSpi();
+
+            if (colSpi == null)
+                colSpi = new NoopCollisionSpi();
+
+            if (deploySpi == null)
+                deploySpi = new LocalDeploymentSpi();
+
+            if (cpSpi == null)
+                cpSpi = new CheckpointSpi[] {new NoopCheckpointSpi()};
+
+            if (failSpi == null)
+                failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()};
+
+            if (loadBalancingSpi == null)
+                loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()};
+
+            if (swapspaceSpi == null) {
+                boolean needSwap = false;
+
+                CacheConfiguration[] caches = cfg.getCacheConfiguration();
+
+                if (caches != null) {
+                    for (CacheConfiguration c : caches) {
+                        if (c.isSwapEnabled()) {
+                            needSwap = true;
+
+                            break;
+                        }
+                    }
+                }
+
+                swapspaceSpi = needSwap ? new FileSwapSpaceSpi() : new NoopSwapSpaceSpi();
+            }
+
+            if (indexingSpi == null)
+                indexingSpi = new NoopIndexingSpi();
+
+            myCfg.setCommunicationSpi(commSpi);
+            myCfg.setDiscoverySpi(discoSpi);
+            myCfg.setCheckpointSpi(cpSpi);
+            myCfg.setEventStorageSpi(evtSpi);
+            myCfg.setDeploymentSpi(deploySpi);
+            myCfg.setFailoverSpi(failSpi);
+            myCfg.setCollisionSpi(colSpi);
+            myCfg.setLoadBalancingSpi(loadBalancingSpi);
+            myCfg.setSwapSpaceSpi(swapspaceSpi);
+            myCfg.setIndexingSpi(indexingSpi);
+
+            // Ensure that SPIs support multiple grid instances, if required.
+            if (!singleGrid) {
+                ensureMultiInstanceSupport(deploySpi);
+                ensureMultiInstanceSupport(commSpi);
+                ensureMultiInstanceSupport(discoSpi);
+                ensureMultiInstanceSupport(cpSpi);
+                ensureMultiInstanceSupport(evtSpi);
+                ensureMultiInstanceSupport(colSpi);
+                ensureMultiInstanceSupport(failSpi);
+                ensureMultiInstanceSupport(loadBalancingSpi);
+                ensureMultiInstanceSupport(swapspaceSpi);
+            }
+        }
+
         /**
          * @param cfgLog Configured logger.
          * @param nodeId Local node ID.