You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 22:04:49 UTC

incubator-ignite git commit: #ignite-1009: Fix local store configuration on clients.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1009-v2 [created] b1f4efd7f


#ignite-1009: Fix local store configuration on clients.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1f4efd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1f4efd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1f4efd7

Branch: refs/heads/ignite-1009-v2
Commit: b1f4efd7fbc32fc05c03d9cc43ef5f4b4a354e90
Parents: 2b63ff8
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jun 11 23:00:34 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jun 11 23:01:55 2015 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeRequest.java        | 17 ++++++
 .../cache/DynamicCacheDescriptor.java           | 54 +++++++++++++++++++-
 .../processors/cache/GridCacheProcessor.java    | 39 +++++++++-----
 .../transactions/IgniteTxLocalAdapter.java      |  4 ++
 4 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index c08a179..f344578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -63,6 +63,9 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Template configuration flag. */
     private boolean template;
 
+    /** Local store flag. */
+    private boolean locStore;
+
     /**
      * Constructor creates cache stop request.
      *
@@ -220,6 +223,20 @@ public class DynamicCacheChangeRequest implements Serializable {
         this.failIfExists = failIfExists;
     }
 
+    /**
+     * @param localStore Local store flag.
+     */
+    public void localStore(boolean localStore) {
+        this.locStore = localStore;
+    }
+
+    /**
+     * @return Local store flag.
+     */
+    public boolean localStore() {
+        return locStore;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 9c6cc43..2df9e9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -55,6 +57,9 @@ public class DynamicCacheDescriptor {
     /** */
     private volatile Map<UUID, CacheConfiguration> rmtCfgs;
 
+    /** */
+    private volatile Map<UUID, Boolean> rmtLocStore;
+
     /** Template configuration flag. */
     private boolean template;
 
@@ -64,6 +69,12 @@ public class DynamicCacheDescriptor {
     /** */
     private boolean updatesAllowed = true;
 
+    /** Local store flag. */
+    private boolean locStore;
+
+    /** Cache store. */
+    private CacheStore store;
+
     /**
      * @param ctx Context.
      * @param cacheCfg Cache configuration.
@@ -75,11 +86,15 @@ public class DynamicCacheDescriptor {
         CacheConfiguration cacheCfg,
         CacheType cacheType,
         boolean template,
-        IgniteUuid deploymentId) {
+        IgniteUuid deploymentId, CacheStore store) {
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
         this.template = template;
         this.deploymentId = deploymentId;
+        this.store = store;
+
+        if (store != null)
+            locStore = U.hasAnnotation(store, CacheLocalStore.class);
 
         pluginMgr = new CachePluginManager(ctx, cacheCfg);
     }
@@ -205,6 +220,29 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @param nodeId Remote node ID.
+     * @param localStore Remote local store flag.
+     */
+    public void addRemoteLocalStore(UUID nodeId, Boolean localStore) {
+        Map<UUID, Boolean> cfgs = rmtLocStore;
+
+        if (cfgs == null)
+            rmtLocStore = cfgs = new HashMap<>();
+
+        cfgs.put(nodeId, localStore);
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @return Remote local store flag.
+     */
+    public boolean remoteLocalStore(UUID nodeId) {
+        Map<UUID, Boolean> cfgs = rmtLocStore;
+
+        return cfgs == null ? null : cfgs.get(nodeId);
+    }
+
+    /**
      *
      */
     public void clearRemoteConfigurations() {
@@ -225,6 +263,20 @@ public class DynamicCacheDescriptor {
         this.updatesAllowed = updatesAllowed;
     }
 
+    /**
+     * @return Local store flag.
+     */
+    public boolean localStore() {
+        return locStore;
+    }
+
+    /**
+     * @return Cache store.
+     */
+    public CacheStore store() {
+        return store;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9b16388..97ee2c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -611,8 +611,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 
+            CacheStore store = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+
             DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
-                IgniteUuid.randomUuid());
+                IgniteUuid.randomUuid(), store);
 
             desc.locallyConfigured(true);
             desc.staticallyConfigured(true);
@@ -644,7 +646,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (cfg.getName() == null) { // Use cache configuration with null name as template.
                 DynamicCacheDescriptor desc0 =
-                    new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
+                    new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid(), store);
 
                 desc0.locallyConfigured(true);
                 desc0.staticallyConfigured(true);
@@ -691,7 +693,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         if (rmtCfg != null) {
                             CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                            checkCache(locCfg, rmtCfg, n, desc);
+                            checkCache(locCfg, rmtCfg, n, desc, desc.remoteLocalStore(n.id()));
 
                             // Check plugin cache configurations.
                             CachePluginManager pluginMgr = desc.pluginManager();
@@ -721,8 +723,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     CachePluginManager pluginMgr = desc.pluginManager();
 
-                    GridCacheContext ctx = createCache(
-                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
+                    GridCacheContext ctx = createCache( ccfg, pluginMgr, desc.store(),
+                        desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
 
                     ctx.dynamicDeploymentId(desc.deploymentId());
 
@@ -1058,6 +1060,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cfg Cache configuration to use to create cache.
      * @param pluginMgr Cache plugin manager.
+     * @param store Cache store.
      * @param cacheType Cache type.
      * @param cacheObjCtx Cache object context.
      * @return Cache context.
@@ -1065,6 +1068,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
+        CacheStore<?,?> store,
         CacheType cacheType,
         CacheObjectContext cacheObjCtx,
         boolean updatesAllowed)
@@ -1072,7 +1076,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     {
         assert cfg != null;
 
-        CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
+        CacheStore cfgStore = store;
+
+        if (cfgStore == null)
+            cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
 
         validate(ctx.config(), cfg, cacheType, cfgStore);
 
@@ -1459,7 +1466,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+            GridCacheContext cacheCtx = createCache(ccfg, null, null, cacheType, cacheObjCtx, true);
 
             cacheCtx.startTopologyVersion(topVer);
 
@@ -1628,6 +1635,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 req.cacheType(desc.cacheType());
 
+                req.localStore(desc.localStore());
+
                 req.deploymentId(desc.deploymentId());
 
                 reqs.add(req);
@@ -1672,7 +1681,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             ccfg,
                             req.cacheType(),
                             true,
-                            req.deploymentId());
+                            req.deploymentId(),
+                            null);
 
                         registeredTemplates.put(maskNull(req.cacheName()), desc);
                     }
@@ -1690,6 +1700,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             existing.deploymentId(req.deploymentId());
 
                             existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+                            existing.addRemoteLocalStore(rmtNodeId, req.localStore());
 
                             ctx.discovery().setCacheFilter(
                                 req.cacheName(),
@@ -1706,7 +1717,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             ccfg,
                             req.cacheType(),
                             false,
-                            req.deploymentId());
+                            req.deploymentId(),
+                            null);
 
                         // Received statically configured cache.
                         if (req.initiatingNodeId() == null)
@@ -2068,7 +2080,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (desc == null) {
                     DynamicCacheDescriptor templateDesc =
-                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId(), null);
 
                     DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
 
@@ -2122,7 +2134,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     assert req.cacheType() != null : req;
 
                     DynamicCacheDescriptor startDesc =
-                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId(), null);
 
                     DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
 
@@ -2247,10 +2259,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param rmtCfg Remote configuration.
      * @param rmtNode Remote node.
      * @param desc Cache descriptor.
+     * @param rmtLocalStore Rempte local store flag.
      * @throws IgniteCheckedException If check failed.
      */
     private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode,
-        DynamicCacheDescriptor desc) throws IgniteCheckedException {
+        DynamicCacheDescriptor desc, boolean rmtLocalStore) throws IgniteCheckedException {
         ClusterNode locNode = ctx.discovery().localNode();
 
         UUID rmt = rmtNode.id();
@@ -2276,7 +2289,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             boolean checkStore;
 
-            if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) {
+            if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL && !rmtLocalStore) {
                 checkStore = locAttr.storeFactoryClassName() != null;
 
                 if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1f4efd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8b5eaec..8a82db2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -503,6 +503,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     boolean skipNear = near() && isWriteToStoreFromDht;
 
                     for (IgniteTxEntry e : writeEntries) {
+                        if (e.context().store() != null && e.context().store().isLocal() &&
+                            !e.context().affinityNode())
+                            continue;
+
                         if ((skipNear && e.cached().isNear()) || e.skipStore())
                             continue;