You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/05 03:54:56 UTC

[4/4] incubator-ignite git commit: IGNITE-45 - WIP

IGNITE-45 - WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62d39de2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62d39de2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62d39de2

Branch: refs/heads/ignite-45
Commit: 62d39de2acfa00d8cf6abf1d1596d9e0116bc534
Parents: c0e60f8
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Mar 4 18:54:47 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Mar 4 18:54:47 2015 -0800

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |  3 +
 .../discovery/GridDiscoveryManager.java         |  4 +-
 .../processors/cache/GridCacheProcessor.java    | 84 +++++++++++++++++---
 .../processors/cache/GridCacheUtils.java        |  2 +-
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  9 ++-
 5 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index d496c9a..e226bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -36,6 +36,9 @@ public interface GridComponent {
         CONTINUOUS_PROC,
 
         /** */
+        CACHE_PROC,
+
+        /** */
         PLUGIN
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 04ca3d0..e510303 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -224,9 +224,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param filter Cache filter.
      */
     public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) {
-        IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter);
-
-        assert old == null;
+        dynamicCacheFilters.put(cacheName, filter);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b7ac0af..ea7cb9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -663,6 +663,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        // Start dynamic caches received from collect discovery data.
+        for (DynamicCacheDescriptor desc : dynamicCaches.values()) {
+            GridCacheContext ctx = createCache(desc.cacheConfiguration());
+
+            sharedCtx.addCacheContext(ctx);
+
+            GridCacheAdapter cache = ctx.cache();
+
+            String name = desc.cacheConfiguration().getName();
+
+            caches.put(name, cache);
+
+            startCache(cache);
+
+            proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));
+
+            jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
+        }
+
         if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
             for (ClusterNode n : ctx.discovery().remoteNodes())
                 checkCache(n);
@@ -1182,11 +1201,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException {
         assert req.isStart();
 
-        CacheConfiguration cfg = new CacheConfiguration(req.startCacheConfiguration());
-
-        initialize(cfg);
-
-        GridCacheContext cacheCtx = createCache(cfg);
+        GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());
 
         cacheCtx.dynamicDeploymentId(req.deploymentId());
 
@@ -1309,6 +1324,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             attrs.put(ATTR_CACHE_INTERCEPTORS, interceptors);
     }
 
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.CACHE_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
+        // Collect dynamically started caches to a single object.
+        Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(dynamicCaches.size());
+
+        for (DynamicCacheDescriptor desc : dynamicCaches.values()) {
+            if (!desc.cancelled())
+                reqs.add(new DynamicCacheChangeRequest(desc.cacheConfiguration(), desc.nodeFilter()));
+        }
+
+        U.debug(log, "Collected discovery data for cache: " + reqs.size());
+
+        return new DynamicCacheChangeBatch(reqs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
+        if (data instanceof DynamicCacheChangeBatch) {
+            DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
+
+            U.debug(log, "Received discovery data: " + batch.requests());
+
+            for (DynamicCacheChangeRequest req : batch.requests()) {
+                dynamicCaches.put(req.cacheName(), new DynamicCacheDescriptor(
+                    req.startCacheConfiguration(),
+                    req.startNodeFilter(),
+                    req.deploymentId()));
+
+                ctx.discovery().addDynamicCacheFilter(req.cacheName(), req.startNodeFilter());
+            }
+        }
+    }
+
     /**
      * Dynamically starts cache.
      *
@@ -1317,12 +1370,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Future that will be completed when cache is deployed.
      */
     public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) {
-        if (nodeFilter == null)
-            nodeFilter = F.alwaysTrue();
+        try {
+            if (nodeFilter == null)
+                nodeFilter = F.alwaysTrue();
+
+            CacheConfiguration cfg = new CacheConfiguration(ccfg);
+
+            initialize(cfg);
 
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ccfg, nodeFilter);
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg, nodeFilter);
 
-        return F.first(initiateCacheChanges(F.asList(req)));
+            return F.first(initiateCacheChanges(F.asList(req)));
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFutureEx<>(e);
+        }
     }
 
     /**
@@ -1425,6 +1487,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param batch Change request batch.
      */
     private void onCacheChangeRequested(DynamicCacheChangeBatch batch) {
+        U.debug(log, "<><><>Received cache change request: " + batch.requests().size());
+
         for (DynamicCacheChangeRequest req : batch.requests()) {
             if (req.isStart()) {
                 CacheConfiguration ccfg = req.startCacheConfiguration();
@@ -1471,6 +1535,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     return;
                 }
 
+                U.debug(log, "Cancelling descriptor: " + desc);
+
                 desc.onCancelled();
 
                 ctx.discovery().removeDynamicCacheFilter(req.cacheName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d6974f3..43062b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -741,7 +741,7 @@ public class GridCacheUtils {
             if (oldest == null || n.order() < oldest.order())
                 oldest = n;
 
-        assert oldest != null;
+        assert oldest != null : "Failed to find oldest node for cache context: " + cctx.name();
         assert oldest.order() <= topOrder || topOrder < 0;
 
         return oldest;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 29eac7c..3b40dc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -249,7 +249,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void _testStartStopCacheAddNode() throws Exception {
+    public void testStartStopCacheAddNode() throws Exception {
         final IgniteKernal kernal = (IgniteKernal)grid(0);
 
         CacheConfiguration ccfg = new CacheConfiguration();
@@ -271,7 +271,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
                 assertEquals("1", grid(g).jcache(CACHE_NAME).get("1"));
 
             // Undeploy cache.
-            kernal.context().cache().dynamicStopCache(CACHE_NAME);
+            kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); // TODO debug without get().
+
+            info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
 
             startGrid(nodeCount() + 1);