You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/17 03:31:26 UTC
incubator-ignite git commit: Merged GG-9090 into ignite-1
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1 03c17b8f3 -> e248ca73f
Merged GG-9090 into ignite-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e248ca73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e248ca73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e248ca73
Branch: refs/heads/ignite-1
Commit: e248ca73f12fddabb5cdd552df25cdbe2817225a
Parents: 03c17b8
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Dec 16 18:31:15 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Dec 16 18:31:15 2014 -0800
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 19 ++-
.../cache/GridCacheProjectionImpl.java | 12 +-
.../distributed/dht/GridDhtCacheEntry.java | 60 ++++++--
.../dht/atomic/GridDhtAtomicCache.java | 63 +++++++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 102 +++++++++++++
.../GridCacheNearPartitionedClearSelfTest.java | 142 +++++++++++++++++++
.../bamboo/GridDataGridTestSuite.java | 1 +
7 files changed, 377 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index eb131fd..bc6e120 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1334,10 +1334,18 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
if (cctx.deferredDelete() && !detached() && !isInternal()) {
- if (!deletedUnlocked())
+ if (!deletedUnlocked()) {
deletedUnlocked(true);
- enqueueVer = newVer;
+ if (tx != null) {
+ GridCacheMvcc<K> mvcc = mvccExtras();
+
+ if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
+ clearReaders();
+ else
+ clearReader(tx.originatingNodeId());
+ }
+ }
}
drReplicate(drType, null, null, newVer);
@@ -2039,6 +2047,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
// No-op.
}
+ /**
+ * @param nodeId Node ID to clear.
+ */
+ protected void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public boolean clear(GridCacheVersion ver, boolean readers,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 67eb9e8..0e7c21c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -1045,7 +1045,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removeAsync(key, entry, and(filter, true));
+ return cache.removeAsync(key, entry, and(filter, false));
}
/** {@inheritDoc} */
@@ -1079,7 +1079,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removexAsync(key, entry, and(filter, true));
+ return cache.removexAsync(key, entry, and(filter, false));
}
/** {@inheritDoc} */
@@ -1123,24 +1123,24 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public void removeAll(@Nullable Collection<? extends K> keys,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {
- cache.removeAll(keys, and(filter, true));
+ cache.removeAll(keys, and(filter, false));
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- return cache.removeAllAsync(keys, and(filter, true));
+ return cache.removeAllAsync(keys, and(filter, false));
}
/** {@inheritDoc} */
@Override public void removeAll(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter)
throws IgniteCheckedException {
- cache.removeAll(and(filter, true));
+ cache.removeAll(and(filter, false));
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> removeAllAsync(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removeAllAsync(and(filter, true));
+ return cache.removeAllAsync(and(filter, false));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2596d..978b60b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -393,12 +393,13 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
if (reader == null) {
reader = new ReaderId<>(nodeId, msgId);
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() + 1);
+ rdrs.addAll(this.rdrs);
rdrs.add(reader);
// Seal.
- rdrs = Collections.unmodifiableList(rdrs);
+ this.rdrs = Collections.unmodifiableList(rdrs);
// No transactions in ATOMIC cache.
if (!cctx.atomic()) {
@@ -472,15 +473,18 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
ReaderId reader = readerId(nodeId);
- if (reader == null || reader.messageId() > msgId)
+ if (reader == null || (reader.messageId() > msgId && msgId >= 0))
return false;
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size());
- rdrs.remove(reader);
+ for (ReaderId<K, V> rdr : this.rdrs) {
+ if (!rdr.equals(reader))
+ rdrs.add(rdr);
+ }
// Seal.
- rdrs = Collections.unmodifiableList(rdrs);
+ this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() : Collections.unmodifiableList(rdrs);
return true;
}
@@ -492,6 +496,11 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
rdrs = Collections.emptyList();
}
+ /** {@inheritDoc} */
+ @Override public synchronized void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
+ removeReader(nodeId, -1);
+ }
+
/**
* Marks entry as obsolete and, if possible or required, removes it
* from swap storage.
@@ -553,24 +562,28 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
checkObsolete();
if (!rdrs.isEmpty()) {
- List<ReaderId> rmv = null;
+ Collection<ReaderId> rmv = null;
for (ReaderId reader : rdrs) {
if (!cctx.discovery().alive(reader.nodeId())) {
if (rmv == null)
- rmv = new LinkedList<>();
+ rmv = new HashSet<>();
rmv.add(reader);
}
}
if (rmv != null) {
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() - rmv.size());
- for (ReaderId rdr : rmv)
- rdrs.remove(rdr);
+ for (ReaderId<K, V> rdr : this.rdrs) {
+ if (!rmv.contains(rdr))
+ rdrs.add(rdr);
+ }
- rdrs = Collections.unmodifiableList(rdrs);
+ // Seal.
+ this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() :
+ Collections.unmodifiableList(rdrs);
}
}
@@ -707,6 +720,29 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
return txFut;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof ReaderId))
+ return false;
+
+ ReaderId readerId = (ReaderId)o;
+
+ return msgId == readerId.msgId && nodeId.equals(readerId.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + (int)(msgId ^ (msgId >>> 32));
+
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ReaderId.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4991adb..b8998e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -13,8 +13,6 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.portables.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -1002,6 +1000,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue(); // Should not request return values for putAll.
+ if (!F.isEmpty(req.filter())) {
+ try {
+ reloadIfNeeded(locked);
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKeys(req.keys(), e);
+
+ return new UpdateBatchResult<>();
+ }
+ }
+
int size = req.keys().size();
Map<K, V> putMap = null;
@@ -1244,6 +1253,56 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * @param entries Entries.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void reloadIfNeeded(final List<GridDhtCacheEntry<K, V>> entries) throws IgniteCheckedException {
+ Map<K, Integer> needReload = null;
+
+ for (int i = 0; i < entries.size(); i++) {
+ GridDhtCacheEntry<K, V> entry = entries.get(i);
+
+ if (entry == null)
+ continue;
+
+ V val = entry.rawGetOrUnmarshal(false);
+
+ if (val == null) {
+ if (needReload == null)
+ needReload = new HashMap<>(entries.size(), 1.0f);
+
+ needReload.put(entry.key(), i);
+ }
+ }
+
+ if (needReload != null) {
+ final Map<K, Integer> idxMap = needReload;
+
+ ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<K, V>() {
+ @Override public void apply(K k, V v) {
+ Integer idx = idxMap.get(k);
+
+ if (idx != null) {
+ GridDhtCacheEntry<K, V> entry = entries.get(idx);
+ try {
+ GridCacheVersion ver = entry.version();
+
+ entry.versionedValue(v, null, ver);
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : "Entry should not get obsolete while holding lock [entry=" + entry +
+ ", e=" + e + ']';
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
* Updates locked entries one-by-one.
*
* @param node Originating node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a5e5c82..406440b 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -17,6 +17,7 @@ import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.apache.ignite.spi.swapspace.inmemory.*;
+import org.gridgain.grid.kernal.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -2932,6 +2933,107 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAfterClear() throws Exception {
+ GridEx grid = grid(0);
+
+ GridCacheDistributionMode distroMode = grid.cache(null).configuration().getDistributionMode();
+
+ if (distroMode == GridCacheDistributionMode.NEAR_ONLY || distroMode == GridCacheDistributionMode.CLIENT_ONLY) {
+ if (gridCount() < 2)
+ return;
+
+ grid = grid(1);
+ }
+
+ GridCacheProjection<Integer, Integer> cache = grid.cache(null)
+ .projection(Integer.class, Integer.class);
+
+ int key = 0;
+
+ List<Integer> keys = new ArrayList<>();
+
+ for (int k = 0; k < 2; k++) {
+ while (!grid.cache(null).affinity().isPrimary(grid.localNode(), key))
+ key++;
+
+ keys.add(key);
+
+ key++;
+ }
+
+ System.out.println(keys);
+
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ cache.clear(keys.get(0));
+ cache.clear(keys.get(1));
+
+ for (int g = 0; g < gridCount(); g++) {
+ Ignite grid0 = grid(g);
+
+ grid0.cache(null).projection(Integer.class, Integer.class).removeAll();
+
+ assertTrue(grid0.cache(null).isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveFilteredAfterClear() throws Exception {
+ GridEx grid = grid(0);
+
+ GridCacheDistributionMode distroMode = grid.cache(null).configuration().getDistributionMode();
+
+ if (distroMode == GridCacheDistributionMode.NEAR_ONLY || distroMode == GridCacheDistributionMode.CLIENT_ONLY) {
+ if (gridCount() < 2)
+ return;
+
+ grid = grid(1);
+ }
+
+ GridCacheProjection<Integer, Integer> cache = grid.cache(null);
+
+ List<Integer> keys = new ArrayList<>();
+
+ int key = 0;
+
+ for (int k = 0; k < 2; k++) {
+ while (!grid.cache(null).affinity().isPrimary(grid.localNode(), key))
+ key++;
+
+ keys.add(key);
+
+ key++;
+ }
+
+ System.out.println(keys);
+
+ for (Integer k : keys)
+ cache.put(k, k + 1);
+
+ cache.clear(keys.get(0));
+ cache.clear(keys.get(1));
+
+ for (int g = 0; g < gridCount(); g++) {
+ Ignite grid0 = grid(g);
+
+ grid0.cache(null).removeAll(new IgnitePredicate<GridCacheEntry<Object,Object>>() {
+ @Override public boolean apply(GridCacheEntry<Object, Object> e) {
+ Object val = e.peek();
+
+ return val instanceof Integer && (Integer)val > 0;
+ }
+ });
+
+ assertTrue(grid0.cache(null).isEmpty());
+ }
+ }
+
+ /**
* @throws Exception In case of error.
*/
public void testClear() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
new file mode 100644
index 0000000..429ff69
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
@@ -0,0 +1,142 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.store.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCachePreloadMode.*;
+import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test clear operation in NEAR_PARTITIONED transactional cache.
+ */
+@SuppressWarnings("unchecked")
+public class GridCacheNearPartitionedClearSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Backup count. */
+ private static final int BACKUP_CNT = 1;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private static GridCacheStore<Object, Object> store = new GridCacheGenericTestStore<>();
+
+ /** Shared IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ G.stopAll(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ GridCacheConfiguration ccfg = new GridCacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setDistributionMode(NEAR_PARTITIONED);
+ ccfg.setPreloadMode(SYNC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(BACKUP_CNT);
+ ccfg.setStore(store);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * Test clear.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClear() throws Exception {
+ GridCache cache = cacheForIndex(0);
+
+ int key = primaryKey0(grid(0), cache);
+
+ cache.putx(key, 1);
+ cache.clear(key);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ GridCache cache0 = cacheForIndex(i);
+
+ cache0.removeAll();
+
+ assert cache0.isEmpty();
+ }
+
+ cache.putx(key, 1);
+ cache.clear(key);
+
+ assertEquals(0, cache.size());
+ }
+
+ /**
+ * Gets primary key for the given cache.
+ *
+ * @param cache Cache.
+ * @return Primary key.
+ * @throws Exception If failed.
+ */
+ private int primaryKey0(Ignite ignite, GridCache cache) throws Exception {
+ ClusterNode locNode = ignite.cluster().localNode();
+
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ if (cache.affinity().isPrimary(locNode, i))
+ return i;
+ }
+
+ throw new Exception("Cannot determine affinity key.");
+ }
+
+ /**
+ * Gets cache for the node with the given index.
+ *
+ * @param idx Index.
+ * @return Cache.
+ */
+ private GridCache cacheForIndex(int idx) {
+ return grid(idx).cache(CACHE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index 3059c4e..ea7f77d 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -184,6 +184,7 @@ public class GridDataGridTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedAffinityHashIdResolverSelfTest.class));
suite.addTest(new TestSuite(GridCacheColocatedOptimisticTransactionSelfTest.class));
suite.addTestSuite(GridCacheAtomicMessageCountSelfTest.class);
+ suite.addTest(new TestSuite(GridCacheNearPartitionedClearSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtExpiredEntriesPreloadSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearExpiredEntriesPreloadSelfTest.class));