You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/10/23 16:05:48 UTC

ignite git commit: ignite-1272: fixed deployment of entry processors for portable caches

Repository: ignite
Updated Branches:
  refs/heads/ignite-1272 c459c0bc5 -> 436e724d7


ignite-1272: fixed deployment of entry processors for portable caches


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/436e724d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/436e724d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/436e724d

Branch: refs/heads/ignite-1272
Commit: 436e724d7def210485d863355ac336d3d515a993
Parents: c459c0b
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Oct 23 17:05:30 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 23 17:05:30 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 10 ++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 12 ++-
 ...idCacheEntryProcessorDeploymentSelfTest.java | 83 +++++++++++---------
 3 files changed, 60 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cba6872..40b4a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -837,7 +837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             taskNameHash,
             opCtx != null && opCtx.skipStore(),
             opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            waitTopFut);
+            waitTopFut,
+            // force addition of deployment info for entry processors regardless of cache specific dep. info setting.
+            ctx.shared().deploymentEnabled() && map == null && invokeMap != null);
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
@@ -902,7 +904,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             taskNameHash,
             opCtx != null && opCtx.skipStore(),
             opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            true);
+            true,
+            false);
 
         if (statsEnabled)
             updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
@@ -2368,7 +2371,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             req.taskNameHash(),
             req.skipStore(),
             MAX_RETRIES,
-            true);
+            true,
+            ctx.shared().deploymentEnabled() && req.deployInfo() != null);
 
         updateFut.map();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ae662c8..ab4ccb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -147,6 +147,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** State. */
     private final UpdateState state;
 
+    /** Force addition of deployment info or not. */
+    private final boolean forceAddDepInfo;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -166,6 +169,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param skipStore Skip store flag.
      * @param remapCnt Maximum number of retries.
      * @param waitTopFut If {@code false} does not wait for affinity change future.
+     * @param forceAddDepInfo Forces addition of deployment info.
      */
     public GridNearAtomicUpdateFuture(
         GridCacheContext cctx,
@@ -185,7 +189,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         int taskNameHash,
         boolean skipStore,
         int remapCnt,
-        boolean waitTopFut
+        boolean waitTopFut,
+        boolean forceAddDepInfo
     ) {
         this.rawRetval = rawRetval;
 
@@ -210,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
         this.waitTopFut = waitTopFut;
+        this.forceAddDepInfo = forceAddDepInfo;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
@@ -1051,7 +1057,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             taskNameHash,
                             skipStore,
                             cctx.kernalContext().clientNode(),
-                            cctx.deploymentEnabled());
+                            cctx.deploymentEnabled() || forceAddDepInfo);
 
                         pendingMappings.put(nodeId, mapped);
                     }
@@ -1144,7 +1150,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 taskNameHash,
                 skipStore,
                 cctx.kernalContext().clientNode(),
-                cctx.deploymentEnabled());
+                cctx.deploymentEnabled() || forceAddDepInfo);
 
             req.addUpdateEntry(cacheKey,
                 val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
index 269b216..494ea42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -87,18 +87,13 @@ public class GridCacheEntryProcessorDeploymentSelfTest extends GridCommonAbstrac
         cfg.setCacheMode(PARTITIONED);
         cfg.setWriteSynchronizationMode(FULL_SYNC);
         cfg.setRebalanceMode(SYNC);
-        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setAtomicityMode(ATOMIC);
         cfg.setNearConfiguration(new NearCacheConfiguration());
         cfg.setBackups(1);
 
         return cfg;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
     /**
      * @throws Exception In case of error.
      */
@@ -139,63 +134,73 @@ public class GridCacheEntryProcessorDeploymentSelfTest extends GridCommonAbstrac
      * @throws Exception In case of error.
      */
     private void doTestInvoke() throws Exception {
-        cliendMode = false;
-        startGrid(0);
+        try {
+            cliendMode = false;
+            startGrid(0);
 
-        cliendMode = true;
-        startGrid(1);
+            cliendMode = true;
+            startGrid(1);
 
-        ClassLoader ldr = getExternalClassLoader();
+            ClassLoader ldr = getExternalClassLoader();
 
-        Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
-        Class valCls = ldr.loadClass(TEST_VALUE);
+            Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
+            Class valCls = ldr.loadClass(TEST_VALUE);
 
-        assertTrue(grid(1).configuration().isClientMode());
+            assertTrue(grid(1).configuration().isClientMode());
 
-        IgniteCache cache = grid(1).cache(null);
+            IgniteCache cache = grid(1).cache(null);
 
-        cache.put("key", valCls.newInstance());
+            cache.put("key", valCls.newInstance());
 
-        Boolean res = (Boolean)cache.invoke("key", (CacheEntryProcessor)procCls.newInstance());
+            Boolean res = (Boolean)cache.invoke("key", (CacheEntryProcessor)procCls.newInstance());
 
-        assertTrue(res);
+            assertTrue(res);
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 
     /**
      * @throws Exception In case of error.
      */
     private void doTestInvokeAll() throws Exception {
-        cliendMode = false;
-        startGrid(0);
+        try {
+            cliendMode = false;
+            startGrid(0);
 
-        cliendMode = true;
-        startGrid(1);
+            cliendMode = true;
+            startGrid(1);
 
-        ClassLoader ldr = getExternalClassLoader();
+            ClassLoader ldr = getExternalClassLoader();
 
-        Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
-        Class valCls = ldr.loadClass(TEST_VALUE);
+            Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
+            Class valCls = ldr.loadClass(TEST_VALUE);
 
-        assertTrue(grid(1).configuration().isClientMode());
+            assertTrue(grid(1).configuration().isClientMode());
 
-        IgniteCache cache = grid(1).cache(null);
+            IgniteCache cache = grid(1).cache(null);
 
-        HashSet keys = new HashSet();
+            HashSet keys = new HashSet();
 
-        for (int i = 0; i < 3; i++) {
-            String key = "key" + i;
+            for (int i = 0; i < 3; i++) {
+                String key = "key" + i;
 
-            cache.put(key, valCls.newInstance());
+                cache.put(key, valCls.newInstance());
 
-            keys.add(key);
-        }
+                keys.add(key);
+            }
 
-        Map<String, EntryProcessorResult> res = (Map<String, EntryProcessorResult>)cache.invokeAll(keys,
-            (CacheEntryProcessor)procCls.newInstance());
+            Map<String, EntryProcessorResult> res = (Map<String, EntryProcessorResult>)cache.invokeAll(keys,
+                (CacheEntryProcessor)procCls.newInstance());
 
-        assertEquals(3, res.size());
+            assertEquals(3, res.size());
 
-        for (EntryProcessorResult result : res.values())
-            assertTrue((Boolean)result.get());
+            for (EntryProcessorResult result : res.values())
+                assertTrue((Boolean)result.get());
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 }