You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/30 13:11:57 UTC
[12/18] incubator-ignite git commit: # ignite-44
# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9fd2f235
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9fd2f235
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9fd2f235
Branch: refs/heads/ignite-1
Commit: 9fd2f235eae58c7963a500e212bc70fd84ae010c
Parents: d60f91e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 26 17:12:45 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 26 18:00:06 2014 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 2 -
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 259 ++++++++++++-------
.../transactions/IgniteTxLocalAdapter.java | 6 +-
.../dataload/GridDataLoadUpdateJob.java | 1 +
.../cache/IgniteCacheInvokeAbstractTest.java | 38 ++-
.../GridCacheDataStructuresSelfTestSuite.java | 5 +-
7 files changed, 194 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 892bbd8..eacae0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -671,7 +671,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
throws IgniteCheckedException {
EntryProcessorResult<T> res = fut.get();
- return res.get();
+ return res != null ? res.get() : null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7b1b3a0..e9bb16e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1335,7 +1335,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
firstEntryIdx = i + 1;
putMap = null;
- entryProcessorMap = null;
filtered = new ArrayList<>();
}
@@ -1377,7 +1376,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
firstEntryIdx = i + 1;
rmvKeys = null;
- entryProcessorMap = null;
filtered = new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 9441bd3..60e42bc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -231,8 +231,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
keys.add(key);
this.keyBytes.add(keyBytes);
- if (forceTransformBackups && entryProcessor != null)
+ if (forceTransformBackups) {
+ assert entryProcessor != null;
+
entryProcessors.add(entryProcessor);
+ }
else {
vals.add(val);
this.valBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
@@ -702,6 +705,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.drVers = drVers;
_clone.ttls = ttls;
_clone.drExpireTimes = drExpireTimes;
+ _clone.nearTtls = nearTtls;
+ _clone.nearExpireTimes = nearExpireTimes;
_clone.syncMode = syncMode;
_clone.nearKeys = nearKeys;
_clone.nearKeyBytes = nearKeyBytes;
@@ -712,8 +717,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.entryProcessorsBytes = entryProcessorsBytes;
_clone.nearEntryProcessors = nearEntryProcessors;
_clone.nearEntryProcessorsBytes = nearEntryProcessorsBytes;
- _clone.nearExpireTimes = nearExpireTimes;
- _clone.nearTtls = nearTtls;
+ _clone.invokeArgs = invokeArgs;
+ _clone.invokeArgsBytes = invokeArgsBytes;
_clone.subjId = subjId;
_clone.taskNameHash = taskNameHash;
}
@@ -741,12 +746,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 4:
- if (!commState.putLongList(ttls))
- return false;
-
- commState.idx++;
-
- case 5:
if (drVers != null) {
if (commState.it == null) {
if (!commState.putInt(drVers.size()))
@@ -773,12 +772,39 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 6:
+ case 5:
if (!commState.putCacheVersion(futVer))
return false;
commState.idx++;
+ case 6:
+ if (invokeArgsBytes != null) {
+ if (commState.it == null) {
+ if (!commState.putInt(invokeArgsBytes.length))
+ return false;
+
+ commState.it = arrayIterator(invokeArgsBytes);
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ if (!commState.putByteArray((byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
+ if (!commState.putInt(-1))
+ return false;
+ }
+
+ commState.idx++;
+
case 7:
if (keyBytes != null) {
if (commState.it == null) {
@@ -807,24 +833,42 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 8:
- if (!commState.putUuid(nodeId))
+ if (!commState.putLongList(nearExpireTimes))
return false;
commState.idx++;
case 9:
- if (!commState.putEnum(syncMode))
+ if (!commState.putLongList(nearTtls))
return false;
commState.idx++;
case 10:
- if (!commState.putLong(topVer))
+ if (!commState.putUuid(nodeId))
return false;
commState.idx++;
case 11:
+ if (!commState.putEnum(syncMode))
+ return false;
+
+ commState.idx++;
+
+ case 12:
+ if (!commState.putLong(topVer))
+ return false;
+
+ commState.idx++;
+
+ case 13:
+ if (!commState.putLongList(ttls))
+ return false;
+
+ commState.idx++;
+
+ case 14:
if (valBytes != null) {
if (commState.it == null) {
if (!commState.putInt(valBytes.size()))
@@ -851,13 +895,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 12:
+ case 15:
if (!commState.putCacheVersion(writeVer))
return false;
commState.idx++;
- case 13:
+ case 16:
if (nearKeyBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearKeyBytes.size()))
@@ -884,7 +928,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 14:
+ case 17:
if (nearValBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearValBytes.size()))
@@ -897,7 +941,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (commState.cur == NULL)
commState.cur = commState.it.next();
- if (!commState.putValueBytes((GridCacheValueBytes) commState.cur))
+ if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
return false;
commState.cur = NULL;
@@ -911,19 +955,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 15:
- if (!commState.putBoolean(forceTransformBackups))
- return false;
-
- commState.idx++;
-
- case 16:
- if (nearEntryProcessorsBytes != null) {
+ case 18:
+ if (entryProcessorsBytes != null) {
if (commState.it == null) {
- if (!commState.putInt(nearEntryProcessorsBytes.size()))
+ if (!commState.putInt(entryProcessorsBytes.size()))
return false;
- commState.it = nearEntryProcessorsBytes.iterator();
+ commState.it = entryProcessorsBytes.iterator();
}
while (commState.it.hasNext() || commState.cur != NULL) {
@@ -944,13 +982,19 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 17:
- if (entryProcessorsBytes != null) {
+ case 19:
+ if (!commState.putBoolean(forceTransformBackups))
+ return false;
+
+ commState.idx++;
+
+ case 20:
+ if (nearEntryProcessorsBytes != null) {
if (commState.it == null) {
- if (!commState.putInt(entryProcessorsBytes.size()))
+ if (!commState.putInt(nearEntryProcessorsBytes.size()))
return false;
- commState.it = entryProcessorsBytes.iterator();
+ commState.it = nearEntryProcessorsBytes.iterator();
}
while (commState.it.hasNext() || commState.cur != NULL) {
@@ -971,29 +1015,18 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 18:
+ case 21:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 19:
+ case 22:
if (!commState.putInt(taskNameHash))
return false;
commState.idx++;
- case 20:
- if (!commState.putLongList(nearExpireTimes))
- return false;
-
- commState.idx++;
-
- case 21:
- if (!commState.putLongList(nearTtls))
- return false;
-
- commState.idx++;
}
return true;
@@ -1019,16 +1052,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 4:
- GridLongList drTtls0 = commState.getLongList();
-
- if (drTtls0 == LONG_LIST_NOT_READ)
- return false;
-
- ttls = drTtls0;
-
- commState.idx++;
-
- case 5:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1057,7 +1080,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 6:
+ case 5:
GridCacheVersion futVer0 = commState.getCacheVersion();
if (futVer0 == CACHE_VER_NOT_READ)
@@ -1067,6 +1090,35 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
+ case 6:
+ if (commState.readSize == -1) {
+ if (buf.remaining() < 4)
+ return false;
+
+ commState.readSize = commState.getInt();
+ }
+
+ if (commState.readSize >= 0) {
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = new byte[commState.readSize][];
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ byte[] _val = commState.getByteArray();
+
+ if (_val == BYTE_ARR_NOT_READ)
+ return false;
+
+ invokeArgsBytes[i] = (byte[])_val;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
case 7:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
@@ -1097,6 +1149,26 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 8:
+ GridLongList nearExpireTimes0 = commState.getLongList();
+
+ if (nearExpireTimes0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearExpireTimes = nearExpireTimes0;
+
+ commState.idx++;
+
+ case 9:
+ GridLongList nearTtls0 = commState.getLongList();
+
+ if (nearTtls0 == LONG_LIST_NOT_READ)
+ return false;
+
+ nearTtls = nearTtls0;
+
+ commState.idx++;
+
+ case 10:
UUID nodeId0 = commState.getUuid();
if (nodeId0 == UUID_NOT_READ)
@@ -1106,7 +1178,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 9:
+ case 11:
if (buf.remaining() < 1)
return false;
@@ -1116,7 +1188,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 10:
+ case 12:
if (buf.remaining() < 8)
return false;
@@ -1124,7 +1196,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 11:
+ case 13:
+ GridLongList ttls0 = commState.getLongList();
+
+ if (ttls0 == LONG_LIST_NOT_READ)
+ return false;
+
+ ttls = ttls0;
+
+ commState.idx++;
+
+ case 14:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1153,7 +1235,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 12:
+ case 15:
GridCacheVersion writeVer0 = commState.getCacheVersion();
if (writeVer0 == CACHE_VER_NOT_READ)
@@ -1163,7 +1245,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 13:
+ case 16:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1192,7 +1274,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 14:
+ case 17:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1221,15 +1303,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 15:
- if (buf.remaining() < 1)
- return false;
-
- forceTransformBackups = commState.getBoolean();
-
- commState.idx++;
-
- case 16:
+ case 18:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1238,8 +1312,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
if (commState.readSize >= 0) {
- if (nearEntryProcessorsBytes == null)
- nearEntryProcessorsBytes = new ArrayList<>(commState.readSize);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = new ArrayList<>(commState.readSize);
for (int i = commState.readItems; i < commState.readSize; i++) {
byte[] _val = commState.getByteArray();
@@ -1247,7 +1321,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (_val == BYTE_ARR_NOT_READ)
return false;
- nearEntryProcessorsBytes.add((byte[]) _val);
+ entryProcessorsBytes.add((byte[])_val);
commState.readItems++;
}
@@ -1258,7 +1332,15 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 17:
+ case 19:
+ if (buf.remaining() < 1)
+ return false;
+
+ forceTransformBackups = commState.getBoolean();
+
+ commState.idx++;
+
+ case 20:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -1267,8 +1349,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
if (commState.readSize >= 0) {
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = new ArrayList<>(commState.readSize);
+ if (nearEntryProcessorsBytes == null)
+ nearEntryProcessorsBytes = new ArrayList<>(commState.readSize);
for (int i = commState.readItems; i < commState.readSize; i++) {
byte[] _val = commState.getByteArray();
@@ -1276,7 +1358,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (_val == BYTE_ARR_NOT_READ)
return false;
- entryProcessorsBytes.add((byte[])_val);
+ nearEntryProcessorsBytes.add((byte[])_val);
commState.readItems++;
}
@@ -1287,7 +1369,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 18:
+ case 21:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -1297,7 +1379,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 19:
+ case 22:
if (buf.remaining() < 4)
return false;
@@ -1305,25 +1387,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
- case 20:
- GridLongList nearExpireTimes0 = commState.getLongList();
-
- if (nearExpireTimes0 == LONG_LIST_NOT_READ)
- return false;
-
- nearExpireTimes = nearExpireTimes0;
-
- commState.idx++;
-
- case 21:
- GridLongList nearTtls0 = commState.getLongList();
-
- if (nearTtls0 == LONG_LIST_NOT_READ)
- return false;
-
- nearTtls = nearTtls0;
-
- commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a66e584..d349215 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2264,10 +2264,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
try {
if (!hasPrevVal)
v = cached.innerGet(this,
- /*swap*/retval,
- /*read-through*/retval,
+ /*swap*/true,
+ /*read-through*/true,
/*failFast*/false,
- /*unmarshal*/retval,
+ /*unmarshal*/true,
/*metrics*/true,
/*event*/!dht(),
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
index a96fc63..fb1b12a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java
@@ -70,6 +70,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
if (log.isDebugEnabled())
log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+// TODO IGNITE-77: restore adapter usage.
// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
//
// IgniteFuture<?> f = cache.context().preloader().startFuture();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index c5d2363..380eced 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.*;
import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
import static org.apache.ignite.transactions.IgniteTxIsolation.*;
import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheFlag.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
/**
@@ -37,24 +38,29 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
* @throws Exception If failed.
*/
public void testInvoke() throws Exception {
- // TODO IGNITE41 test with forceTransformBackups.
+ IgniteCache<Integer, Integer> cache = jcache();
- invoke(null);
+ invoke(cache, null);
if (atomicityMode() == TRANSACTIONAL) {
- invoke(PESSIMISTIC);
+ invoke(cache, PESSIMISTIC);
+
+ invoke(cache, OPTIMISTIC);
+ }
+ else if (gridCount() > 1) {
+ cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP);
- invoke(OPTIMISTIC);
+ invoke(cache, null);
}
}
/**
+ * @param cache Cache.
* @param txMode Not null transaction concurrency mode if explicit transaction should be started.
* @throws Exception If failed.
*/
- private void invoke(@Nullable IgniteTxConcurrency txMode) throws Exception {
- final IgniteCache<Integer, Integer> cache = jcache();
-
+ private void invoke(final IgniteCache<Integer, Integer> cache, @Nullable IgniteTxConcurrency txMode)
+ throws Exception {
IncrementProcessor incProcessor = new IncrementProcessor();
for (final Integer key : keys()) {
@@ -147,22 +153,28 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
* @throws Exception If failed.
*/
public void testInvokeAll() throws Exception {
- invokeAll(null);
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ invokeAll(cache, null);
if (atomicityMode() == TRANSACTIONAL) {
- invoke(PESSIMISTIC);
+ invokeAll(cache, PESSIMISTIC);
+
+ invokeAll(cache, OPTIMISTIC);
+ }
+ else if (gridCount() > 1) {
+ cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP);
- invoke(OPTIMISTIC);
+ invokeAll(cache, null);
}
}
/**
+ * @param cache Cache.
* @param txMode Not null transaction concurrency mode if explicit transaction should be started.
* @throws Exception If failed.
*/
- private void invokeAll(@Nullable IgniteTxConcurrency txMode) throws Exception {
- IgniteCache<Integer, Integer> cache = jcache();
-
+ private void invokeAll(IgniteCache<Integer, Integer> cache, @Nullable IgniteTxConcurrency txMode) throws Exception {
invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode);
if (gridCount() > 1) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fd2f235/modules/core/src/test/java/org/gridgain/testsuites/GridCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/GridCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/GridCacheDataStructuresSelfTestSuite.java
index 8dbd03b..a5a313c 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/GridCacheDataStructuresSelfTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/GridCacheDataStructuresSelfTestSuite.java
@@ -51,8 +51,9 @@ public class GridCacheDataStructuresSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueApiSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedQueueMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueMultiNodeSelfTest.class));
- suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
- suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));
+ // TODO IGNITE-44.
+ //suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
+ //suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedSetSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAtomicSetSelfTest.class));