You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 12:41:47 UTC
incubator-ignite git commit: # ignite-709_3 do not create system
caches on client nodes
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-709_3 [created] 63261f586
# ignite-709_3 do not create system caches on client nodes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/63261f58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/63261f58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/63261f58
Branch: refs/heads/ignite-709_3
Commit: 63261f5866695ddfbbca3f9d5508f4d7964263dc
Parents: 4ba5671
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 14 13:07:44 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 14 13:35:34 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgniteEx.java | 3 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +-
.../org/apache/ignite/internal/IgnitionEx.java | 16 ++--
.../processors/cache/GridCacheProcessor.java | 94 +++++++++++++++-----
.../processors/cache/GridCacheUtils.java | 25 ++++--
.../GridDhtPartitionsExchangeFuture.java | 19 +++-
6 files changed, 112 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
index 4845d51..bc7e722 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
@@ -35,8 +35,9 @@ public interface IgniteEx extends Ignite {
* Gets utility cache.
*
* @return Utility cache.
+ * @throws IgniteCheckedException If failed.
*/
- public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache();
+ public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() throws IgniteCheckedException;
/**
* Gets the cache instance for the given name if one is configured or
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d98b023..1c68a82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2462,7 +2462,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
- @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
+ @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache()
+ throws IgniteCheckedException {
guard();
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 0807d09..9c7340e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1822,20 +1822,14 @@ public class IgnitionEx {
public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
List<CacheConfiguration> cacheCfgs = new ArrayList<>();
- boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi;
+ cacheCfgs.add(marshallerSystemCache());
- // Add marshaller and utility caches.
- if (!clientDisco) {
- cacheCfgs.add(marshallerSystemCache());
-
- cacheCfgs.add(utilitySystemCache());
- }
+ cacheCfgs.add(utilitySystemCache());
if (IgniteComponentType.HADOOP.inClassPath())
cacheCfgs.add(CU.hadoopSystemCache());
- if (cfg.getAtomicConfiguration() != null && !clientDisco)
- cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
+ cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration()));
CacheConfiguration[] userCaches = cfg.getCacheConfiguration();
@@ -2021,7 +2015,7 @@ public class IgnitionEx {
cache.setRebalanceMode(SYNC);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setAffinity(new RendezvousAffinityFunction(false, 20));
- cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+ cache.setNodeFilter(CacheConfiguration.SERVER_NODES);
cache.setStartSize(300);
return cache;
@@ -2042,7 +2036,7 @@ public class IgnitionEx {
cache.setRebalanceMode(SYNC);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setAffinity(new RendezvousAffinityFunction(false, 100));
- cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+ cache.setNodeFilter(CacheConfiguration.SERVER_NODES);
return cache;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..86d80c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -733,13 +733,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cacheStartedLatch.countDown();
}
- ctx.marshallerContext().onMarshallerCacheStarted(ctx);
+ if (marshallerCache() == null) {
+ assert ctx.config().isClientMode() : "Marshaller cache is missed on server node.";
- marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
- @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
- ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
- }
- });
+ IgniteInternalFuture<?> fut = startCacheAsync(CU.MARSH_CACHE_NAME, true);
+
+ assert fut != null;
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ marshallerCacheCallbacks();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to initialize marshaller context.", e);
+ }
+ }
+ });
+ }
+ else
+ marshallerCacheCallbacks();
// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
@@ -788,6 +801,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.cacheObjects().onCacheProcessorStarted();
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void marshallerCacheCallbacks() throws IgniteCheckedException {
+ ctx.marshallerContext().onMarshallerCacheStarted(ctx);
+
+ marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
+ @Override
+ public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
+ ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
+ }
+ });
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void stop(boolean cancel) throws IgniteCheckedException {
@@ -2499,18 +2526,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Gets utility cache.
*
* @return Utility cache.
+ * @throws IgniteCheckedException If failed.
*/
- public <K, V> GridCacheAdapter<K, V> utilityCache() {
- return internalCache(CU.UTILITY_CACHE_NAME);
- }
+ public <K, V> GridCacheAdapter<K, V> utilityCache() throws IgniteCheckedException {
+ GridCacheAdapter<K, V> cache = internalCache(CU.UTILITY_CACHE_NAME);
- /**
- * Gets utility cache for atomic data structures.
- *
- * @return Utility cache for atomic data structures.
- */
- public <K, V> IgniteInternalCache<K, V> atomicsCache() {
- return cache(CU.ATOMICS_CACHE_NAME);
+ if (cache != null)
+ return cache;
+
+ assert ctx.config().isClientMode() : "Utility cache is missed on server node.";
+
+ getOrStartCache(CU.UTILITY_CACHE_NAME);
+
+ return internalCache(CU.UTILITY_CACHE_NAME);
}
/**
@@ -2590,6 +2618,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+ IgniteInternalFuture<?> fut = startCacheAsync(cacheName, failIfNotStarted);
+
+ if (fut != null) {
+ fut.get();
+
+ String masked = maskNull(cacheName);
+
+ IgniteCache cache = jCacheProxies.get(masked);
+
+ if (cache == null && failIfNotStarted)
+ throw new IllegalArgumentException("Cache is not started: " + cacheName);
+
+ return cache;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
+ * otherwise returns {@code null} in this case.
+ * @return Future.
+ */
+ @Nullable private IgniteInternalFuture<?> startCacheAsync(String cacheName, boolean failIfNotStarted) {
String masked = maskNull(cacheName);
DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2617,14 +2670,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(true);
- F.first(initiateCacheChanges(F.asList(req))).get();
-
- IgniteCache cache = jCacheProxies.get(masked);
-
- if (cache == null && failIfNotStarted)
- throw new IllegalArgumentException("Cache is not started: " + cacheName);
-
- return cache;
+ return F.first(initiateCacheChanges(F.asList(req)));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..e502b54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -651,7 +651,7 @@ public class GridCacheUtils {
* @return Oldest node for the given topology version.
*/
public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) {
- ClusterNode oldest = null;
+ ClusterNode oldest = oldest(aliveCacheNodes(cctx, topOrder));
for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) {
if (oldest == null || n.order() < oldest.order())
@@ -665,6 +665,21 @@ public class GridCacheUtils {
}
/**
+ * @param nodes Nodes.
+ * @return Oldest node for the given topology version.
+ */
+ @Nullable public static ClusterNode oldest(Collection<ClusterNode> nodes) {
+ ClusterNode oldest = null;
+
+ for (ClusterNode n : nodes) {
+ if (oldest == null || n.order() < oldest.order())
+ oldest = n;
+ }
+
+ return oldest;
+ }
+
+ /**
* @return Empty filter.
*/
@SuppressWarnings({"unchecked"})
@@ -1451,13 +1466,7 @@ public class GridCacheUtils {
}
/**
- * @return Cache ID for utility cache.
- */
- public static int utilityCacheId() {
- return cacheId(UTILITY_CACHE_NAME);
- }
-
- /**
+ * @param cacheName Cache name.
* @return Cache ID.
*/
public static int cacheId(String cacheName) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63261f58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 45d332c..120c537 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -221,12 +221,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
log = cctx.logger(getClass());
+ initFut = new GridFutureAdapter<>();
+
// Grab all nodes with order of equal or less than last joined node.
- oldestNode.set(CU.oldest(cctx, exchId.topologyVersion()));
+ Collection<ClusterNode> nodes = CU.aliveCacheNodes(cctx, exchId.topologyVersion());
- assert oldestNode.get() != null;
+ if (nodes.isEmpty()) {
+ initFut.onDone(true);
- initFut = new GridFutureAdapter<>();
+ onDone(exchId.topologyVersion());
+
+ return;
+ }
+
+ oldestNode.set(CU.oldest(nodes));
+
+ assert oldestNode.get() != null;
if (log.isDebugEnabled())
log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
@@ -444,6 +454,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void init() throws IgniteInterruptedCheckedException {
+ if (isDone())
+ return;
+
assert oldestNode.get() != null;
if (init.compareAndSet(false, true)) {