You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/10/23 16:05:48 UTC
ignite git commit: ignite-1272: fixed deployment of entry processors
for portable caches
Repository: ignite
Updated Branches:
refs/heads/ignite-1272 c459c0bc5 -> 436e724d7
ignite-1272: fixed deployment of entry processors for portable caches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/436e724d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/436e724d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/436e724d
Branch: refs/heads/ignite-1272
Commit: 436e724d7def210485d863355ac336d3d515a993
Parents: c459c0b
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Oct 23 17:05:30 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Oct 23 17:05:30 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 10 ++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 12 ++-
...idCacheEntryProcessorDeploymentSelfTest.java | 83 +++++++++++---------
3 files changed, 60 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cba6872..40b4a3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -837,7 +837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskNameHash,
opCtx != null && opCtx.skipStore(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- waitTopFut);
+ waitTopFut,
+ // force addition of deployment info for entry processors regardless of cache specific dep. info setting.
+ ctx.shared().deploymentEnabled() && map == null && invokeMap != null);
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@Override public IgniteInternalFuture<Object> apply() {
@@ -902,7 +904,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskNameHash,
opCtx != null && opCtx.skipStore(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- true);
+ true,
+ false);
if (statsEnabled)
updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
@@ -2368,7 +2371,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.taskNameHash(),
req.skipStore(),
MAX_RETRIES,
- true);
+ true,
+ ctx.shared().deploymentEnabled() && req.deployInfo() != null);
updateFut.map();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ae662c8..ab4ccb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -147,6 +147,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** State. */
private final UpdateState state;
+ /** Force addition of deployment info or not. */
+ private final boolean forceAddDepInfo;
+
/**
* @param cctx Cache context.
* @param cache Cache instance.
@@ -166,6 +169,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param skipStore Skip store flag.
* @param remapCnt Maximum number of retries.
* @param waitTopFut If {@code false} does not wait for affinity change future.
+ * @param forceAddDepInfo Forces addition of deployment info.
*/
public GridNearAtomicUpdateFuture(
GridCacheContext cctx,
@@ -185,7 +189,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
int taskNameHash,
boolean skipStore,
int remapCnt,
- boolean waitTopFut
+ boolean waitTopFut,
+ boolean forceAddDepInfo
) {
this.rawRetval = rawRetval;
@@ -210,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
this.waitTopFut = waitTopFut;
+ this.forceAddDepInfo = forceAddDepInfo;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
@@ -1051,7 +1057,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
taskNameHash,
skipStore,
cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled() || forceAddDepInfo);
pendingMappings.put(nodeId, mapped);
}
@@ -1144,7 +1150,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
taskNameHash,
skipStore,
cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled() || forceAddDepInfo);
req.addUpdateEntry(cacheKey,
val,
http://git-wip-us.apache.org/repos/asf/ignite/blob/436e724d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
index 269b216..494ea42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryProcessorDeploymentSelfTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -87,18 +87,13 @@ public class GridCacheEntryProcessorDeploymentSelfTest extends GridCommonAbstrac
cfg.setCacheMode(PARTITIONED);
cfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setRebalanceMode(SYNC);
- cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setAtomicityMode(ATOMIC);
cfg.setNearConfiguration(new NearCacheConfiguration());
cfg.setBackups(1);
return cfg;
}
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
/**
* @throws Exception In case of error.
*/
@@ -139,63 +134,73 @@ public class GridCacheEntryProcessorDeploymentSelfTest extends GridCommonAbstrac
* @throws Exception In case of error.
*/
private void doTestInvoke() throws Exception {
- cliendMode = false;
- startGrid(0);
+ try {
+ cliendMode = false;
+ startGrid(0);
- cliendMode = true;
- startGrid(1);
+ cliendMode = true;
+ startGrid(1);
- ClassLoader ldr = getExternalClassLoader();
+ ClassLoader ldr = getExternalClassLoader();
- Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
- Class valCls = ldr.loadClass(TEST_VALUE);
+ Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
+ Class valCls = ldr.loadClass(TEST_VALUE);
- assertTrue(grid(1).configuration().isClientMode());
+ assertTrue(grid(1).configuration().isClientMode());
- IgniteCache cache = grid(1).cache(null);
+ IgniteCache cache = grid(1).cache(null);
- cache.put("key", valCls.newInstance());
+ cache.put("key", valCls.newInstance());
- Boolean res = (Boolean)cache.invoke("key", (CacheEntryProcessor)procCls.newInstance());
+ Boolean res = (Boolean)cache.invoke("key", (CacheEntryProcessor)procCls.newInstance());
- assertTrue(res);
+ assertTrue(res);
+ }
+ finally {
+ stopAllGrids();
+ }
}
/**
* @throws Exception In case of error.
*/
private void doTestInvokeAll() throws Exception {
- cliendMode = false;
- startGrid(0);
+ try {
+ cliendMode = false;
+ startGrid(0);
- cliendMode = true;
- startGrid(1);
+ cliendMode = true;
+ startGrid(1);
- ClassLoader ldr = getExternalClassLoader();
+ ClassLoader ldr = getExternalClassLoader();
- Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
- Class valCls = ldr.loadClass(TEST_VALUE);
+ Class procCls = ldr.loadClass(TEST_ENT_PROCESSOR);
+ Class valCls = ldr.loadClass(TEST_VALUE);
- assertTrue(grid(1).configuration().isClientMode());
+ assertTrue(grid(1).configuration().isClientMode());
- IgniteCache cache = grid(1).cache(null);
+ IgniteCache cache = grid(1).cache(null);
- HashSet keys = new HashSet();
+ HashSet keys = new HashSet();
- for (int i = 0; i < 3; i++) {
- String key = "key" + i;
+ for (int i = 0; i < 3; i++) {
+ String key = "key" + i;
- cache.put(key, valCls.newInstance());
+ cache.put(key, valCls.newInstance());
- keys.add(key);
- }
+ keys.add(key);
+ }
- Map<String, EntryProcessorResult> res = (Map<String, EntryProcessorResult>)cache.invokeAll(keys,
- (CacheEntryProcessor)procCls.newInstance());
+ Map<String, EntryProcessorResult> res = (Map<String, EntryProcessorResult>)cache.invokeAll(keys,
+ (CacheEntryProcessor)procCls.newInstance());
- assertEquals(3, res.size());
+ assertEquals(3, res.size());
- for (EntryProcessorResult result : res.values())
- assertTrue((Boolean)result.get());
+ for (EntryProcessorResult result : res.values())
+ assertTrue((Boolean)result.get());
+ }
+ finally {
+ stopAllGrids();
+ }
}
}