You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 22:04:49 UTC
incubator-ignite git commit: #ignite-1009: Fix local store
configuration on clients.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1009-v2 [created] b1f4efd7f
#ignite-1009: Fix local store configuration on clients.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1f4efd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1f4efd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1f4efd7
Branch: refs/heads/ignite-1009-v2
Commit: b1f4efd7fbc32fc05c03d9cc43ef5f4b4a354e90
Parents: 2b63ff8
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jun 11 23:00:34 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jun 11 23:01:55 2015 +0300
----------------------------------------------------------------------
.../cache/DynamicCacheChangeRequest.java | 17 ++++++
.../cache/DynamicCacheDescriptor.java | 54 +++++++++++++++++++-
.../processors/cache/GridCacheProcessor.java | 39 +++++++++-----
.../transactions/IgniteTxLocalAdapter.java | 4 ++
4 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index c08a179..f344578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -63,6 +63,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Template configuration flag. */
private boolean template;
+ /** Local store flag. */
+ private boolean locStore;
+
/**
* Constructor creates cache stop request.
*
@@ -220,6 +223,20 @@ public class DynamicCacheChangeRequest implements Serializable {
this.failIfExists = failIfExists;
}
+ /**
+ * @param localStore Local store flag.
+ */
+ public void localStore(boolean localStore) {
+ this.locStore = localStore;
+ }
+
+ /**
+ * @return Local store flag.
+ */
+ public boolean localStore() {
+ return locStore;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 9c6cc43..2df9e9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.store.*;
import org.apache.ignite.internal.processors.plugin.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -55,6 +57,9 @@ public class DynamicCacheDescriptor {
/** */
private volatile Map<UUID, CacheConfiguration> rmtCfgs;
+ /** */
+ private volatile Map<UUID, Boolean> rmtLocStore;
+
/** Template configuration flag. */
private boolean template;
@@ -64,6 +69,12 @@ public class DynamicCacheDescriptor {
/** */
private boolean updatesAllowed = true;
+ /** Local store flag. */
+ private boolean locStore;
+
+ /** Cache store. */
+ private CacheStore store;
+
/**
* @param ctx Context.
* @param cacheCfg Cache configuration.
@@ -75,11 +86,15 @@ public class DynamicCacheDescriptor {
CacheConfiguration cacheCfg,
CacheType cacheType,
boolean template,
- IgniteUuid deploymentId) {
+ IgniteUuid deploymentId, CacheStore store) {
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.template = template;
this.deploymentId = deploymentId;
+ this.store = store;
+
+ if (store != null)
+ locStore = U.hasAnnotation(store, CacheLocalStore.class);
pluginMgr = new CachePluginManager(ctx, cacheCfg);
}
@@ -205,6 +220,29 @@ public class DynamicCacheDescriptor {
}
/**
+ * @param nodeId Remote node ID.
+ * @param localStore Remote local store flag.
+ */
+ public void addRemoteLocalStore(UUID nodeId, Boolean localStore) {
+ Map<UUID, Boolean> cfgs = rmtLocStore;
+
+ if (cfgs == null)
+ rmtLocStore = cfgs = new HashMap<>();
+
+ cfgs.put(nodeId, localStore);
+ }
+
+ /**
+ * @param nodeId Remote node ID.
+ * @return Remote local store flag.
+ */
+ public boolean remoteLocalStore(UUID nodeId) {
+ Map<UUID, Boolean> cfgs = rmtLocStore;
+
+ return cfgs == null ? null : cfgs.get(nodeId);
+ }
+
+ /**
*
*/
public void clearRemoteConfigurations() {
@@ -225,6 +263,20 @@ public class DynamicCacheDescriptor {
this.updatesAllowed = updatesAllowed;
}
+ /**
+ * @return Local store flag.
+ */
+ public boolean localStore() {
+ return locStore;
+ }
+
+ /**
+ * @return Cache store.
+ */
+ public CacheStore store() {
+ return store;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9b16388..97ee2c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -611,8 +611,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
+ CacheStore store = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
- IgniteUuid.randomUuid());
+ IgniteUuid.randomUuid(), store);
desc.locallyConfigured(true);
desc.staticallyConfigured(true);
@@ -644,7 +646,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cfg.getName() == null) { // Use cache configuration with null name as template.
DynamicCacheDescriptor desc0 =
- new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
+ new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid(), store);
desc0.locallyConfigured(true);
desc0.staticallyConfigured(true);
@@ -691,7 +693,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (rmtCfg != null) {
CacheConfiguration locCfg = desc.cacheConfiguration();
- checkCache(locCfg, rmtCfg, n, desc);
+ checkCache(locCfg, rmtCfg, n, desc, desc.remoteLocalStore(n.id()));
// Check plugin cache configurations.
CachePluginManager pluginMgr = desc.pluginManager();
@@ -721,8 +723,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CachePluginManager pluginMgr = desc.pluginManager();
- GridCacheContext ctx = createCache(
- ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
+ GridCacheContext ctx = createCache( ccfg, pluginMgr, desc.store(),
+ desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
ctx.dynamicDeploymentId(desc.deploymentId());
@@ -1058,6 +1060,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cfg Cache configuration to use to create cache.
* @param pluginMgr Cache plugin manager.
+ * @param store Cache store.
* @param cacheType Cache type.
* @param cacheObjCtx Cache object context.
* @return Cache context.
@@ -1065,6 +1068,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
@Nullable CachePluginManager pluginMgr,
+ CacheStore<?,?> store,
CacheType cacheType,
CacheObjectContext cacheObjCtx,
boolean updatesAllowed)
@@ -1072,7 +1076,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
{
assert cfg != null;
- CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+ CacheStore cfgStore = store;
+
+ if (cfgStore == null)
+ cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
validate(ctx.config(), cfg, cacheType, cfgStore);
@@ -1459,7 +1466,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+ GridCacheContext cacheCtx = createCache(ccfg, null, null, cacheType, cacheObjCtx, true);
cacheCtx.startTopologyVersion(topVer);
@@ -1628,6 +1635,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheType(desc.cacheType());
+ req.localStore(desc.localStore());
+
req.deploymentId(desc.deploymentId());
reqs.add(req);
@@ -1672,7 +1681,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg,
req.cacheType(),
true,
- req.deploymentId());
+ req.deploymentId(),
+ null);
registeredTemplates.put(maskNull(req.cacheName()), desc);
}
@@ -1690,6 +1700,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
existing.deploymentId(req.deploymentId());
existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+ existing.addRemoteLocalStore(rmtNodeId, req.localStore());
ctx.discovery().setCacheFilter(
req.cacheName(),
@@ -1706,7 +1717,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg,
req.cacheType(),
false,
- req.deploymentId());
+ req.deploymentId(),
+ null);
// Received statically configured cache.
if (req.initiatingNodeId() == null)
@@ -2068,7 +2080,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc == null) {
DynamicCacheDescriptor templateDesc =
- new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId());
+ new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId(), null);
DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
@@ -2122,7 +2134,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
assert req.cacheType() != null : req;
DynamicCacheDescriptor startDesc =
- new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+ new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId(), null);
DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
@@ -2247,10 +2259,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param rmtCfg Remote configuration.
* @param rmtNode Remote node.
* @param desc Cache descriptor.
+ * @param rmtLocalStore Rempte local store flag.
* @throws IgniteCheckedException If check failed.
*/
private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode,
- DynamicCacheDescriptor desc) throws IgniteCheckedException {
+ DynamicCacheDescriptor desc, boolean rmtLocalStore) throws IgniteCheckedException {
ClusterNode locNode = ctx.discovery().localNode();
UUID rmt = rmtNode.id();
@@ -2276,7 +2289,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean checkStore;
- if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) {
+ if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL && !rmtLocalStore) {
checkStore = locAttr.storeFactoryClassName() != null;
if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8b5eaec..8a82db2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -503,6 +503,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
boolean skipNear = near() && isWriteToStoreFromDht;
for (IgniteTxEntry e : writeEntries) {
+ if (e.context().store() != null && e.context().store().isLocal() &&
+ !e.context().affinityNode())
+ continue;
+
if ((skipNear && e.cached().isNear()) || e.skipStore())
continue;