You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/03/05 03:54:56 UTC
[4/4] incubator-ignite git commit: IGNITE-45 - WIP
IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62d39de2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62d39de2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62d39de2
Branch: refs/heads/ignite-45
Commit: 62d39de2acfa00d8cf6abf1d1596d9e0116bc534
Parents: c0e60f8
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Mar 4 18:54:47 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Mar 4 18:54:47 2015 -0800
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 3 +
.../discovery/GridDiscoveryManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 84 +++++++++++++++++---
.../processors/cache/GridCacheUtils.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 9 ++-
5 files changed, 87 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index d496c9a..e226bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -36,6 +36,9 @@ public interface GridComponent {
CONTINUOUS_PROC,
/** */
+ CACHE_PROC,
+
+ /** */
PLUGIN
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 04ca3d0..e510303 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -224,9 +224,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param filter Cache filter.
*/
public void addDynamicCacheFilter(String cacheName, IgnitePredicate<ClusterNode> filter) {
- IgnitePredicate<ClusterNode> old = dynamicCacheFilters.put(cacheName, filter);
-
- assert old == null;
+ dynamicCacheFilters.put(cacheName, filter);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b7ac0af..ea7cb9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -663,6 +663,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
+ // Start dynamic caches received from collect discovery data.
+ for (DynamicCacheDescriptor desc : dynamicCaches.values()) {
+ GridCacheContext ctx = createCache(desc.cacheConfiguration());
+
+ sharedCtx.addCacheContext(ctx);
+
+ GridCacheAdapter cache = ctx.cache();
+
+ String name = desc.cacheConfiguration().getName();
+
+ caches.put(name, cache);
+
+ startCache(cache);
+
+ proxies.put(name, new GridCacheProxyImpl(ctx, cache, null));
+
+ jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
+ }
+
if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
for (ClusterNode n : ctx.discovery().remoteNodes())
checkCache(n);
@@ -1182,11 +1201,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void prepareCacheStart(DynamicCacheChangeRequest req) throws IgniteCheckedException {
assert req.isStart();
- CacheConfiguration cfg = new CacheConfiguration(req.startCacheConfiguration());
-
- initialize(cfg);
-
- GridCacheContext cacheCtx = createCache(cfg);
+ GridCacheContext cacheCtx = createCache(req.startCacheConfiguration());
cacheCtx.dynamicDeploymentId(req.deploymentId());
@@ -1309,6 +1324,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
attrs.put(ATTR_CACHE_INTERCEPTORS, interceptors);
}
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return DiscoveryDataExchangeType.CACHE_PROC;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
+ // Collect dynamically started caches to a single object.
+ Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(dynamicCaches.size());
+
+ for (DynamicCacheDescriptor desc : dynamicCaches.values()) {
+ if (!desc.cancelled())
+ reqs.add(new DynamicCacheChangeRequest(desc.cacheConfiguration(), desc.nodeFilter()));
+ }
+
+ U.debug(log, "Collected discovery data for cache: " + reqs.size());
+
+ return new DynamicCacheChangeBatch(reqs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
+ if (data instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
+
+ U.debug(log, "Received discovery data: " + batch.requests());
+
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ dynamicCaches.put(req.cacheName(), new DynamicCacheDescriptor(
+ req.startCacheConfiguration(),
+ req.startNodeFilter(),
+ req.deploymentId()));
+
+ ctx.discovery().addDynamicCacheFilter(req.cacheName(), req.startNodeFilter());
+ }
+ }
+ }
+
/**
* Dynamically starts cache.
*
@@ -1317,12 +1370,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> dynamicStartCache(CacheConfiguration ccfg, IgnitePredicate<ClusterNode> nodeFilter) {
- if (nodeFilter == null)
- nodeFilter = F.alwaysTrue();
+ try {
+ if (nodeFilter == null)
+ nodeFilter = F.alwaysTrue();
+
+ CacheConfiguration cfg = new CacheConfiguration(ccfg);
+
+ initialize(cfg);
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(ccfg, nodeFilter);
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cfg, nodeFilter);
- return F.first(initiateCacheChanges(F.asList(req)));
+ return F.first(initiateCacheChanges(F.asList(req)));
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFutureEx<>(e);
+ }
}
/**
@@ -1425,6 +1487,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param batch Change request batch.
*/
private void onCacheChangeRequested(DynamicCacheChangeBatch batch) {
+ U.debug(log, "<><><>Received cache change request: " + batch.requests().size());
+
for (DynamicCacheChangeRequest req : batch.requests()) {
if (req.isStart()) {
CacheConfiguration ccfg = req.startCacheConfiguration();
@@ -1471,6 +1535,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return;
}
+ U.debug(log, "Cancelling descriptor: " + desc);
+
desc.onCancelled();
ctx.discovery().removeDynamicCacheFilter(req.cacheName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d6974f3..43062b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -741,7 +741,7 @@ public class GridCacheUtils {
if (oldest == null || n.order() < oldest.order())
oldest = n;
- assert oldest != null;
+ assert oldest != null : "Failed to find oldest node for cache context: " + cctx.name();
assert oldest.order() <= topOrder || topOrder < 0;
return oldest;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62d39de2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 29eac7c..3b40dc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -249,7 +249,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void _testStartStopCacheAddNode() throws Exception {
+ public void testStartStopCacheAddNode() throws Exception {
final IgniteKernal kernal = (IgniteKernal)grid(0);
CacheConfiguration ccfg = new CacheConfiguration();
@@ -271,7 +271,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
assertEquals("1", grid(g).jcache(CACHE_NAME).get("1"));
// Undeploy cache.
- kernal.context().cache().dynamicStopCache(CACHE_NAME);
+ kernal.context().cache().dynamicStopCache(CACHE_NAME).get(); // TODO debug without get().
+
+ info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+ info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+ info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+ info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
startGrid(nodeCount() + 1);