You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/17 16:11:57 UTC
[1/5] incubator-ignite git commit: Merged GG-9090 into ignite-1
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-41 688a2e715 -> 764297932
Merged GG-9090 into ignite-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e248ca73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e248ca73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e248ca73
Branch: refs/heads/ignite-41
Commit: e248ca73f12fddabb5cdd552df25cdbe2817225a
Parents: 03c17b8
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Dec 16 18:31:15 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Dec 16 18:31:15 2014 -0800
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 19 ++-
.../cache/GridCacheProjectionImpl.java | 12 +-
.../distributed/dht/GridDhtCacheEntry.java | 60 ++++++--
.../dht/atomic/GridDhtAtomicCache.java | 63 +++++++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 102 +++++++++++++
.../GridCacheNearPartitionedClearSelfTest.java | 142 +++++++++++++++++++
.../bamboo/GridDataGridTestSuite.java | 1 +
7 files changed, 377 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index eb131fd..bc6e120 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1334,10 +1334,18 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
if (cctx.deferredDelete() && !detached() && !isInternal()) {
- if (!deletedUnlocked())
+ if (!deletedUnlocked()) {
deletedUnlocked(true);
- enqueueVer = newVer;
+ if (tx != null) {
+ GridCacheMvcc<K> mvcc = mvccExtras();
+
+ if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
+ clearReaders();
+ else
+ clearReader(tx.originatingNodeId());
+ }
+ }
}
drReplicate(drType, null, null, newVer);
@@ -2039,6 +2047,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
// No-op.
}
+ /**
+ * @param nodeId Node ID to clear.
+ */
+ protected void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public boolean clear(GridCacheVersion ver, boolean readers,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 67eb9e8..0e7c21c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -1045,7 +1045,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removeAsync(key, entry, and(filter, true));
+ return cache.removeAsync(key, entry, and(filter, false));
}
/** {@inheritDoc} */
@@ -1079,7 +1079,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removexAsync(key, entry, and(filter, true));
+ return cache.removexAsync(key, entry, and(filter, false));
}
/** {@inheritDoc} */
@@ -1123,24 +1123,24 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public void removeAll(@Nullable Collection<? extends K> keys,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {
- cache.removeAll(keys, and(filter, true));
+ cache.removeAll(keys, and(filter, false));
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- return cache.removeAllAsync(keys, and(filter, true));
+ return cache.removeAllAsync(keys, and(filter, false));
}
/** {@inheritDoc} */
@Override public void removeAll(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter)
throws IgniteCheckedException {
- cache.removeAll(and(filter, true));
+ cache.removeAll(and(filter, false));
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> removeAllAsync(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return cache.removeAllAsync(and(filter, true));
+ return cache.removeAllAsync(and(filter, false));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2596d..978b60b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -393,12 +393,13 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
if (reader == null) {
reader = new ReaderId<>(nodeId, msgId);
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() + 1);
+ rdrs.addAll(this.rdrs);
rdrs.add(reader);
// Seal.
- rdrs = Collections.unmodifiableList(rdrs);
+ this.rdrs = Collections.unmodifiableList(rdrs);
// No transactions in ATOMIC cache.
if (!cctx.atomic()) {
@@ -472,15 +473,18 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
ReaderId reader = readerId(nodeId);
- if (reader == null || reader.messageId() > msgId)
+ if (reader == null || (reader.messageId() > msgId && msgId >= 0))
return false;
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size());
- rdrs.remove(reader);
+ for (ReaderId<K, V> rdr : this.rdrs) {
+ if (!rdr.equals(reader))
+ rdrs.add(rdr);
+ }
// Seal.
- rdrs = Collections.unmodifiableList(rdrs);
+ this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() : Collections.unmodifiableList(rdrs);
return true;
}
@@ -492,6 +496,11 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
rdrs = Collections.emptyList();
}
+ /** {@inheritDoc} */
+ @Override public synchronized void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
+ removeReader(nodeId, -1);
+ }
+
/**
* Marks entry as obsolete and, if possible or required, removes it
* from swap storage.
@@ -553,24 +562,28 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
checkObsolete();
if (!rdrs.isEmpty()) {
- List<ReaderId> rmv = null;
+ Collection<ReaderId> rmv = null;
for (ReaderId reader : rdrs) {
if (!cctx.discovery().alive(reader.nodeId())) {
if (rmv == null)
- rmv = new LinkedList<>();
+ rmv = new HashSet<>();
rmv.add(reader);
}
}
if (rmv != null) {
- rdrs = new LinkedList<>(rdrs);
+ List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() - rmv.size());
- for (ReaderId rdr : rmv)
- rdrs.remove(rdr);
+ for (ReaderId<K, V> rdr : this.rdrs) {
+ if (!rmv.contains(rdr))
+ rdrs.add(rdr);
+ }
- rdrs = Collections.unmodifiableList(rdrs);
+ // Seal.
+ this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() :
+ Collections.unmodifiableList(rdrs);
}
}
@@ -707,6 +720,29 @@ public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
return txFut;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof ReaderId))
+ return false;
+
+ ReaderId readerId = (ReaderId)o;
+
+ return msgId == readerId.msgId && nodeId.equals(readerId.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + (int)(msgId ^ (msgId >>> 32));
+
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ReaderId.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4991adb..b8998e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -13,8 +13,6 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.portables.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -1002,6 +1000,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue(); // Should not request return values for putAll.
+ if (!F.isEmpty(req.filter())) {
+ try {
+ reloadIfNeeded(locked);
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKeys(req.keys(), e);
+
+ return new UpdateBatchResult<>();
+ }
+ }
+
int size = req.keys().size();
Map<K, V> putMap = null;
@@ -1244,6 +1253,56 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * @param entries Entries.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void reloadIfNeeded(final List<GridDhtCacheEntry<K, V>> entries) throws IgniteCheckedException {
+ Map<K, Integer> needReload = null;
+
+ for (int i = 0; i < entries.size(); i++) {
+ GridDhtCacheEntry<K, V> entry = entries.get(i);
+
+ if (entry == null)
+ continue;
+
+ V val = entry.rawGetOrUnmarshal(false);
+
+ if (val == null) {
+ if (needReload == null)
+ needReload = new HashMap<>(entries.size(), 1.0f);
+
+ needReload.put(entry.key(), i);
+ }
+ }
+
+ if (needReload != null) {
+ final Map<K, Integer> idxMap = needReload;
+
+ ctx.store().loadAllFromStore(null, needReload.keySet(), new CI2<K, V>() {
+ @Override public void apply(K k, V v) {
+ Integer idx = idxMap.get(k);
+
+ if (idx != null) {
+ GridDhtCacheEntry<K, V> entry = entries.get(idx);
+ try {
+ GridCacheVersion ver = entry.version();
+
+ entry.versionedValue(v, null, ver);
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert false : "Entry should not get obsolete while holding lock [entry=" + entry +
+ ", e=" + e + ']';
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
* Updates locked entries one-by-one.
*
* @param node Originating node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a5e5c82..406440b 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -17,6 +17,7 @@ import org.apache.ignite.lang.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.apache.ignite.spi.swapspace.inmemory.*;
+import org.gridgain.grid.kernal.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -2932,6 +2933,107 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
/**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAfterClear() throws Exception {
+ GridEx grid = grid(0);
+
+ GridCacheDistributionMode distroMode = grid.cache(null).configuration().getDistributionMode();
+
+ if (distroMode == GridCacheDistributionMode.NEAR_ONLY || distroMode == GridCacheDistributionMode.CLIENT_ONLY) {
+ if (gridCount() < 2)
+ return;
+
+ grid = grid(1);
+ }
+
+ GridCacheProjection<Integer, Integer> cache = grid.cache(null)
+ .projection(Integer.class, Integer.class);
+
+ int key = 0;
+
+ List<Integer> keys = new ArrayList<>();
+
+ for (int k = 0; k < 2; k++) {
+ while (!grid.cache(null).affinity().isPrimary(grid.localNode(), key))
+ key++;
+
+ keys.add(key);
+
+ key++;
+ }
+
+ System.out.println(keys);
+
+ for (Integer k : keys)
+ cache.put(k, k);
+
+ cache.clear(keys.get(0));
+ cache.clear(keys.get(1));
+
+ for (int g = 0; g < gridCount(); g++) {
+ Ignite grid0 = grid(g);
+
+ grid0.cache(null).projection(Integer.class, Integer.class).removeAll();
+
+ assertTrue(grid0.cache(null).isEmpty());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveFilteredAfterClear() throws Exception {
+ GridEx grid = grid(0);
+
+ GridCacheDistributionMode distroMode = grid.cache(null).configuration().getDistributionMode();
+
+ if (distroMode == GridCacheDistributionMode.NEAR_ONLY || distroMode == GridCacheDistributionMode.CLIENT_ONLY) {
+ if (gridCount() < 2)
+ return;
+
+ grid = grid(1);
+ }
+
+ GridCacheProjection<Integer, Integer> cache = grid.cache(null);
+
+ List<Integer> keys = new ArrayList<>();
+
+ int key = 0;
+
+ for (int k = 0; k < 2; k++) {
+ while (!grid.cache(null).affinity().isPrimary(grid.localNode(), key))
+ key++;
+
+ keys.add(key);
+
+ key++;
+ }
+
+ System.out.println(keys);
+
+ for (Integer k : keys)
+ cache.put(k, k + 1);
+
+ cache.clear(keys.get(0));
+ cache.clear(keys.get(1));
+
+ for (int g = 0; g < gridCount(); g++) {
+ Ignite grid0 = grid(g);
+
+ grid0.cache(null).removeAll(new IgnitePredicate<GridCacheEntry<Object,Object>>() {
+ @Override public boolean apply(GridCacheEntry<Object, Object> e) {
+ Object val = e.peek();
+
+ return val instanceof Integer && (Integer)val > 0;
+ }
+ });
+
+ assertTrue(grid0.cache(null).isEmpty());
+ }
+ }
+
+ /**
* @throws Exception In case of error.
*/
public void testClear() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
new file mode 100644
index 0000000..429ff69
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheNearPartitionedClearSelfTest.java
@@ -0,0 +1,142 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.store.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCachePreloadMode.*;
+import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Test clear operation in NEAR_PARTITIONED transactional cache.
+ */
+@SuppressWarnings("unchecked")
+public class GridCacheNearPartitionedClearSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Backup count. */
+ private static final int BACKUP_CNT = 1;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache";
+
+ /** */
+ private static GridCacheStore<Object, Object> store = new GridCacheGenericTestStore<>();
+
+ /** Shared IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ G.stopAll(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ GridCacheConfiguration ccfg = new GridCacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setDistributionMode(NEAR_PARTITIONED);
+ ccfg.setPreloadMode(SYNC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(BACKUP_CNT);
+ ccfg.setStore(store);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * Test clear.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClear() throws Exception {
+ GridCache cache = cacheForIndex(0);
+
+ int key = primaryKey0(grid(0), cache);
+
+ cache.putx(key, 1);
+ cache.clear(key);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ GridCache cache0 = cacheForIndex(i);
+
+ cache0.removeAll();
+
+ assert cache0.isEmpty();
+ }
+
+ cache.putx(key, 1);
+ cache.clear(key);
+
+ assertEquals(0, cache.size());
+ }
+
+ /**
+ * Gets primary key for the given cache.
+ *
+ * @param cache Cache.
+ * @return Primary key.
+ * @throws Exception If failed.
+ */
+ private int primaryKey0(Ignite ignite, GridCache cache) throws Exception {
+ ClusterNode locNode = ignite.cluster().localNode();
+
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ if (cache.affinity().isPrimary(locNode, i))
+ return i;
+ }
+
+ throw new Exception("Cannot determine affinity key.");
+ }
+
+ /**
+ * Gets cache for the node with the given index.
+ *
+ * @param idx Index.
+ * @return Cache.
+ */
+ private GridCache cacheForIndex(int idx) {
+ return grid(idx).cache(CACHE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e248ca73/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index 3059c4e..ea7f77d 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -184,6 +184,7 @@ public class GridDataGridTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedAffinityHashIdResolverSelfTest.class));
suite.addTest(new TestSuite(GridCacheColocatedOptimisticTransactionSelfTest.class));
suite.addTestSuite(GridCacheAtomicMessageCountSelfTest.class);
+ suite.addTest(new TestSuite(GridCacheNearPartitionedClearSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtExpiredEntriesPreloadSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearExpiredEntriesPreloadSelfTest.class));
[4/5] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-1' into ignite-41
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1' into ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/91227dfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/91227dfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/91227dfb
Branch: refs/heads/ignite-41
Commit: 91227dfb580c81d73a1910620ac2ad246318e3df
Parents: d84da7d e248ca7
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 17 13:53:37 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 17 13:53:37 2014 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 19 ++-
.../cache/GridCacheProjectionImpl.java | 12 +-
.../distributed/dht/GridDhtCacheEntry.java | 60 ++++++--
.../dht/atomic/GridDhtAtomicCache.java | 63 +++++++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 102 +++++++++++++
.../GridCacheNearPartitionedClearSelfTest.java | 142 +++++++++++++++++++
.../bamboo/GridDataGridTestSuite.java | 1 +
7 files changed, 377 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91227dfb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91227dfb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91227dfb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/91227dfb/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
[5/5] incubator-ignite git commit: # ignite-41
Posted by sb...@apache.org.
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76429793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76429793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76429793
Branch: refs/heads/ignite-41
Commit: 764297932db43970d4cdcdb54475064bb76d44a1
Parents: 91227df
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 17 18:11:41 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 17 18:11:41 2014 +0300
----------------------------------------------------------------------
.../cache/GridCacheAccessExpiryPolicy.java | 85 ++++++
.../processors/cache/GridCacheAdapter.java | 3 +-
.../processors/cache/GridCacheContext.java | 11 +-
.../processors/cache/GridCacheEntryEx.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 39 ++-
.../processors/cache/GridCacheTxAdapter.java | 3 +-
.../cache/GridCacheTxLocalAdapter.java | 19 +-
.../distributed/GridCacheTtlUpdateRequest.java | 285 +++++++++++++++++++
.../distributed/dht/GridDhtCacheAdapter.java | 58 +++-
.../dht/GridDhtTransactionalCacheAdapter.java | 3 +-
.../dht/GridPartitionedGetFuture.java | 14 +-
.../dht/atomic/GridDhtAtomicCache.java | 83 +++++-
.../dht/colocated/GridDhtColocatedCache.java | 27 +-
.../distributed/near/GridNearCacheAdapter.java | 11 +-
.../distributed/near/GridNearGetFuture.java | 10 +-
.../local/atomic/GridLocalAtomicCache.java | 12 +-
.../GridTcpCommunicationMessageFactory.java | 7 +-
.../IgniteCacheExpiryPolicyAbstractTest.java | 110 +++----
.../IgniteCacheExpiryPolicyTestSuite.java | 2 +-
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
20 files changed, 649 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
new file mode 100644
index 0000000..0b6152d
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -0,0 +1,85 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.expiry.*;
+
+/**
+ *
+ */
+public class GridCacheAccessExpiryPolicy {
+ /** */
+ private final long ttl;
+
+ /** */
+ private GridCacheTtlUpdateRequest req;
+
+ /**
+ * @param expiryPlc Expiry policy.
+ * @return Access expire policy.
+ */
+ public static GridCacheAccessExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) {
+ if (expiryPlc == null)
+ return null;
+
+ Duration duration = expiryPlc.getExpiryForAccess();
+
+ if (duration == null)
+ return null;
+
+ return new GridCacheAccessExpiryPolicy(GridCacheMapEntry.toTtl(duration));
+ }
+
+ /**
+ * @param ttl TTL for access.
+ */
+ public GridCacheAccessExpiryPolicy(long ttl) {
+ assert ttl >= 0 : ttl;
+
+ this.ttl = ttl;
+ }
+
+ /**
+ * @return TTL.
+ */
+ public long ttl() {
+ return ttl;
+ }
+
+ /**
+ * @param key Entry key.
+ * @param keyBytes Entry key bytes.
+ * @param ver Entry version.
+ */
+ @SuppressWarnings("unchecked")
+ public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+ if (req == null)
+ req = new GridCacheTtlUpdateRequest(ttl);
+
+ req.addEntry(key, keyBytes, ver);
+ }
+
+ /**
+ * @return TTL update request.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable public <K, V> GridCacheTtlUpdateRequest<K, V> request() {
+ return (GridCacheTtlUpdateRequest<K, V>)req;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheAccessExpiryPolicy.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 39b7338..1f40a7c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1812,7 +1812,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
subjId,
null,
taskName,
- filter);
+ filter,
+ null);
GridCacheVersion ver = entry.version();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 931e243..cb4337e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -1090,8 +1090,17 @@ public class GridCacheContext<K, V> implements Externalizable {
if (subjId != null)
return subjId;
- GridCacheProjectionImpl<K, V> prj = projectionPerCall();
+ return subjectIdPerCall(subjId, projectionPerCall());
+ }
+ /**
+ * Gets subject ID per call.
+ *
+ * @param subjId Optional already existing subject ID.
+ * @param prj Optional thread local projection.
+ * @return Subject ID per call.
+ */
+ public UUID subjectIdPerCall(@Nullable UUID subjId, @Nullable GridCacheProjectionImpl<K, V> prj) {
if (prj != null)
subjId = prj.subjectId();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 76d73b3..2b38247 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -270,10 +270,11 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* temporary object can used for filter evaluation or transform closure execution and
* should not be returned to user.
* @param subjId Subject ID initiated this read.
+ * @param transformClo Transform closure to record event.
* @param taskName Task name.
* @param filter Filter to check prior to getting the value. Note that filter check
* together with getting the value is an atomic operation.
- * @param transformClo Transform closure to record event.
+ * @param expiryPlc Expiry policy.
* @return Cached value.
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
@@ -290,7 +291,8 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
UUID subjId,
Object transformClo,
String taskName,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable GridCacheAccessExpiryPolicy expiryPlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 7f9ff4d..00f7382 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -699,7 +699,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
UUID subjId,
Object transformClo,
String taskName,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable GridCacheAccessExpiryPolicy expirePlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
cctx.denyOnFlag(LOCAL);
@@ -714,7 +715,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
subjId,
transformClo,
taskName,
- filter);
+ filter,
+ expirePlc);
}
/** {@inheritDoc} */
@@ -730,7 +732,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
UUID subjId,
Object transformClo,
String taskName,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable GridCacheAccessExpiryPolicy expiryPlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
// Disable read-through if there is no store.
if (readThrough && !cctx.isStoreEnabled())
@@ -877,6 +880,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
// No more notifications.
evt = false;
}
+
+ if (ret != null && expiryPlc != null) {
+ long ttl = expiryPlc.ttl();
+
+ assert ttl >= 0 : ttl;
+
+ updateTtl(ttl);
+
+ expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version());
+ }
}
if (asyncRefresh && !readThrough && cctx.isStoreEnabled()) {
@@ -906,7 +919,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
subjId,
transformClo,
taskName,
- filter);
+ filter,
+ expiryPlc);
}
boolean loadedFromStore = false;
@@ -987,7 +1001,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
subjId,
transformClo,
taskName,
- filter);
+ filter,
+ expiryPlc);
}
/** {@inheritDoc} */
@@ -1658,6 +1673,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return toTtl(duration);
}
+ /**
+ * @param duration Duration.
+ * @return TTL.
+ */
public static long toTtl(Duration duration) {
if (duration == null)
return -1;
@@ -1826,17 +1845,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
boolean pass = cctx.isAll(wrapFilterLocked(), filter);
if (!pass) {
- if (!isNew() && expiryPlc != null) {
+ if (hasValueUnlocked() && expiryPlc != null) {
Duration duration = expiryPlc.getExpiryForAccess();
- if (duration != null)
- updateTtl(toTtl(duration));
+ newTtl = toTtl(duration);
+
+ if (newTtl != -1L)
+ updateTtl(newTtl);
}
return new GridCacheUpdateAtomicResult<>(false,
retval ? old : null,
null,
- 0L,
+ newTtl,
-1L,
null,
null,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
index 7a32afa..edf1e92 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
@@ -1166,7 +1166,8 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
/*subjId*/subjId,
/**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
resolveTaskName(),
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
try {
for (IgniteClosure<V, V> clos : txEntry.transformClosures())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 7b6f266..1c96b32 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -663,8 +663,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
txEntry.ttl(GridCacheMapEntry.toTtl(duration));
-
- log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", plc=" + expiry);
}
}
@@ -1081,7 +1079,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
CU.subjectId(this, cctx),
transformClo,
resolveTaskName(),
- filter);
+ filter,
+ null);
if (val != null) {
if (!readCommitted())
@@ -1155,7 +1154,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
CU.subjectId(this, cctx),
null,
resolveTaskName(),
- filter);
+ filter,
+ null);
if (val != null) {
V val0 = val;
@@ -1501,7 +1501,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
CU.subjectId(GridCacheTxLocalAdapter.this, cctx),
transformClo,
resolveTaskName(),
- filter);
+ filter,
+ null);
// If value is in cache and passed the filter.
if (val != null) {
@@ -1844,7 +1845,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
if (optimistic()) {
try {
- //Should read through if filter is specified.
+ // Should read through if filter is specified.
old = entry.innerGet(this,
/*swap*/false,
/*read-through*/readThrough,
@@ -1856,7 +1857,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
CU.subjectId(this, cctx),
transformClo,
resolveTaskName(),
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
}
catch (GridCacheFilterFailedException e) {
e.printStackTrace();
@@ -2065,7 +2067,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
CU.subjectId(this, cctx),
null,
resolveTaskName(),
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
}
catch (GridCacheFilterFailedException e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
new file mode 100644
index 0000000..71e314e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -0,0 +1,285 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
+ /** */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> keysBytes;
+
+ /** Entry keys. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<K> keys;
+
+ /** Entry versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> vers;
+
+ /** New TTL. */
+ private long ttl;
+
+ /**
+ * Required empty constructor.
+ */
+ public GridCacheTtlUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param ttl TTL.
+ */
+ public GridCacheTtlUpdateRequest(long ttl) {
+ assert ttl >= 0 : ttl;
+
+ this.ttl = ttl;
+ }
+
+ /**
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param ver Version.
+ */
+ public void addEntry(K key, byte[] keyBytes, GridCacheVersion ver) {
+ if (keys == null) {
+ keys = new ArrayList<>();
+
+ keysBytes = new ArrayList<>();
+
+ vers = new ArrayList<>();
+ }
+
+ keys.add(key);
+
+ keysBytes.add(keyBytes);
+
+ vers.add(ver);
+ }
+
+ /**
+ * @return Keys.
+ */
+ public List<K> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
+ throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (keys == null && keysBytes != null)
+ keys = unmarshalCollection(keysBytes, ctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 82;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("CloneDoesntCallSuperClone")
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCacheTtlUpdateRequest _clone = new GridCacheTtlUpdateRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
+ if (!commState.putByte(directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
+ if (keysBytes != null) {
+ if (commState.it == null) {
+ if (!commState.putInt(keysBytes.size()))
+ return false;
+
+ commState.it = keysBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ if (!commState.putByteArray((byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
+ if (!commState.putInt(-1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 4:
+ if (!commState.putLong(ttl))
+ return false;
+
+ commState.idx++;
+
+ case 5:
+ if (vers != null) {
+ if (commState.it == null) {
+ if (!commState.putInt(vers.size()))
+ return false;
+
+ commState.it = vers.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
+ if (!commState.putInt(-1))
+ return false;
+ }
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
+ if (commState.readSize == -1) {
+ if (buf.remaining() < 4)
+ return false;
+
+ commState.readSize = commState.getInt();
+ }
+
+ if (commState.readSize >= 0) {
+ if (keysBytes == null)
+ keysBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ byte[] _val = commState.getByteArray();
+
+ if (_val == BYTE_ARR_NOT_READ)
+ return false;
+
+ keysBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 4:
+ if (buf.remaining() < 8)
+ return false;
+
+ ttl = commState.getLong();
+
+ commState.idx++;
+
+ case 5:
+ if (commState.readSize == -1) {
+ if (buf.remaining() < 4)
+ return false;
+
+ commState.readSize = commState.getInt();
+ }
+
+ if (commState.readSize >= 0) {
+ if (vers == null)
+ vers = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ GridCacheVersion _val = commState.getCacheVersion();
+
+ if (_val == CACHE_VER_NOT_READ)
+ return false;
+
+ vers.add((GridCacheVersion)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg;
+
+ _clone.keysBytes = keysBytes;
+ _clone.keys = keys;
+ _clone.vers = vers;
+ _clone.ttl = ttl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheTtlUpdateRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c13f8f4..3557d17 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -92,6 +92,24 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ super.start();
+
+ ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) {
+ processTtlUpdateRequest(req);
+ }
+ });
+ }
+
+ /**
+ * @param req Request.
+ */
+ private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
+ log.info("Ttl update: " + req);
+ }
+
+ /** {@inheritDoc} */
@Override public void stop() {
super.stop();
@@ -470,11 +488,26 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param filter Optional filter.
* @return DHT future.
*/
- public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, long msgId,
- LinkedHashMap<? extends K, Boolean> keys, boolean reload, long topVer, @Nullable UUID subjId,
- int taskNameHash, boolean deserializePortable, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, keys, reload, /*tx*/null,
- topVer, filter, subjId, taskNameHash, deserializePortable);
+ public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader,
+ long msgId,
+ LinkedHashMap<? extends K, Boolean> keys,
+ boolean reload,
+ long topVer,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean deserializePortable,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
+ msgId,
+ reader,
+ keys,
+ reload,
+ /*tx*/null,
+ topVer,
+ filter,
+ subjId,
+ taskNameHash,
+ deserializePortable);
fut.init();
@@ -489,13 +522,22 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
assert isAffinityNode(cacheCfg);
IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
- getDhtAsync(nodeId, req.messageId(), req.keys(), req.reload(), req.topologyVersion(), req.subjectId(),
- req.taskNameHash(), false, req.filter());
+ getDhtAsync(nodeId,
+ req.messageId(),
+ req.keys(),
+ req.reload(),
+ req.topologyVersion(),
+ req.subjectId(),
+ req.taskNameHash(),
+ false,
+ req.filter());
fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() {
@Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) {
GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(),
- req.futureId(), req.miniId(), req.version());
+ req.futureId(),
+ req.miniId(),
+ req.version());
GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
(GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 7cee7d9..dfbe4fa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -971,7 +971,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
CU.subjectId(tx, ctx.shared()),
null,
tx != null ? tx.resolveTaskName() : null,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
assert e.lockedBy(mappedVer) ||
(ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index f79bc2f..5f6af05 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -293,8 +293,15 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
// If this is the primary or backup node for the keys.
if (n.isLocal()) {
final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
- cache().getDhtAsync(n.id(), -1, mappedKeys, reload, topVer, subjId,
- taskName == null ? 0 : taskName.hashCode(), deserializePortable, filters);
+ cache().getDhtAsync(n.id(),
+ -1,
+ mappedKeys,
+ reload,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ deserializePortable,
+ filters);
final Collection<Integer> invalidParts = fut.invalidPartitions();
@@ -405,7 +412,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
subjId,
null,
taskName,
- filters);
+ filters,
+ null);
colocated.context().evicts().touch(entry, topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d1deed4..6287e16 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -16,6 +16,7 @@ import org.apache.ignite.plugin.security.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -141,6 +142,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "SimplifiableIfStatement"})
@Override public void start() throws IgniteCheckedException {
+ super.start();
+
resetMetrics();
preldr = new GridDhtPreloader<>(ctx);
@@ -258,13 +261,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean deserializePortable,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
- subjId = ctx.subjectIdPerCall(subjId);
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+ subjId = ctx.subjectIdPerCall(null, prj);
final UUID subjId0 = subjId;
+ final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
return asyncOp(new CO<IgniteFuture<Map<K, V>>>() {
@Override public IgniteFuture<Map<K, V>> apply() {
- return getAllAsync0(keys, false, forcePrimary, filter, subjId0, taskName, deserializePortable);
+ return getAllAsync0(keys,
+ false,
+ forcePrimary,
+ filter,
+ expiryPlc,
+ subjId0,
+ taskName,
+ deserializePortable);
}
});
}
@@ -595,7 +609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
- UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
+ UUID subjId = ctx.subjectIdPerCall(null, prj);
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
@@ -691,10 +705,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param reload Reload flag.
* @param forcePrimary Force primary flag.
* @param filter Filter.
+ * @param expiryPlc Expiry policy.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
* @return Get future.
*/
- private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload,
- boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName,
+ private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
+ boolean reload,
+ boolean forcePrimary,
+ @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable ExpiryPolicy expiryPlc,
+ UUID subjId,
+ String taskName,
boolean deserializePortable) {
ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
@@ -712,6 +735,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean success = true;
+ final GridCacheAccessExpiryPolicy expiry =
+ GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
// Optimistically expect that all keys are available locally (avoid creation of get future).
for (K key : keys) {
GridCacheEntryEx<K, V> entry = null;
@@ -735,7 +761,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
subjId,
null,
taskName,
- filter);
+ filter,
+ expiry);
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -785,13 +812,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
break;
}
- if (success)
+ if (success) {
+ if (expiry != null && expiry.request() != null) {
+ ctx.closures().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ GridCacheTtlUpdateRequest<K, V> req = expiry.request();
+
+ assert !F.isEmpty(req.keys());
+
+ Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1);
+
+ req.cacheId(ctx.cacheId());
+
+ ctx.io().safeSend(nodes, req, null);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send TTL update request.", e);
+ }
+ }
+ });
+ }
+
return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+ }
}
// Either reload or not all values are available locally.
- GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary,
- filter, subjId, taskName, deserializePortable);
+ GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
+ keys,
+ topVer,
+ reload,
+ forcePrimary,
+ filter,
+ subjId,
+ taskName,
+ deserializePortable);
fut.init();
@@ -1066,7 +1122,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
transform,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
if (transformMap == null)
transformMap = new HashMap<>();
@@ -1174,7 +1231,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
null,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
@@ -1207,7 +1265,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.subjectId(),
null,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
entry.key(), old);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 8e9f808..1052e1d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -207,11 +207,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param keys Keys to load.
* @param reload Reload flag.
* @param forcePrimary Force get from primary node flag.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
* @param filter Filter.
* @return Loaded values.
*/
- public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean reload,
- boolean forcePrimary, long topVer, @Nullable UUID subjId, String taskName, boolean deserializePortable,
+ public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
+ boolean reload,
+ boolean forcePrimary,
+ long topVer,
+ @Nullable UUID subjId,
+ String taskName,
+ boolean deserializePortable,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -248,7 +257,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
subjId,
null,
taskName,
- filter);
+ filter,
+ null);
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null) {
@@ -301,8 +311,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
// Either reload or not all values are available locally.
- GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary,
- filter, subjId, taskName, deserializePortable);
+ GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
+ keys,
+ topVer,
+ reload,
+ forcePrimary,
+ filter,
+ subjId,
+ taskName,
+ deserializePortable);
fut.init();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b785103..db1a058 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -259,8 +259,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null;
- GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, reload, forcePrimary, txx, filter,
- subjId, taskName, deserializePortable);
+ GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
+ keys,
+ reload,
+ forcePrimary,
+ txx,
+ filter,
+ subjId,
+ taskName,
+ deserializePortable);
// init() will register future for responses if future has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index 6e1f494..1f1de06 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -250,7 +250,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* @param mapped Mappings to check for duplicates.
* @param topVer Topology version to map on.
*/
- private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, final long topVer) {
+ private void map(Collection<? extends K> keys,
+ Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped,
+ final long topVer) {
Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
if (affNodes.isEmpty()) {
@@ -402,7 +404,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
subjId,
null,
taskName,
- filters);
+ filters,
+ null);
ClusterNode primary = null;
@@ -427,7 +430,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
subjId,
null,
taskName,
- filters);
+ filters,
+ null);
// Entry was not in memory or in swap, so we remove it from cache.
if (v == null && isNew && entry.markObsoleteIfEmpty(ver))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 433d199..6eda650 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -607,7 +607,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- filter);
+ filter,
+ null);
if (v != null)
vals.put(key, v);
@@ -924,7 +925,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
transform,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
V updated = transform.apply(old);
@@ -1004,7 +1006,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
val = ctx.config().getInterceptor().onBeforePut(entry.key(), old, val);
@@ -1034,7 +1037,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
subjId,
null,
taskName,
- CU.<K, V>empty());
+ CU.<K, V>empty(),
+ null);
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
entry.key(), old);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index 7c92065..b2ae55b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -37,7 +37,7 @@ import java.util.*;
*/
public class GridTcpCommunicationMessageFactory {
/** Common message producers. */
- private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[82];
+ private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[83];
/**
* Custom messages registry. Used for test purposes.
@@ -264,6 +264,9 @@ public class GridTcpCommunicationMessageFactory {
case 81:
return new GridJobExecuteRequestV2();
+ case 82:
+ return new GridCacheTtlUpdateRequest();
+
default:
assert false : "Invalid message type.";
@@ -274,7 +277,7 @@ public class GridTcpCommunicationMessageFactory {
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */ 73, 74, 75, 76, 77, 78, 79,
- 80, 81);
+ 80, 81, 82);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index c8abd0e..93b0405 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -125,6 +125,37 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
/**
* @throws Exception If failed.
*/
+ public void testAccess() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+ startGrids();
+
+ for (final Integer key : keys()) {
+ log.info("Test access [key=" + key + ']');
+
+ access(key);
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void access(Integer key) throws Exception {
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ cache.put(key, 1);
+
+ checkTtl(key, 60_000L);
+
+ assertEquals((Integer)1, cache.get(key));
+
+ checkTtl(key, 62_000L);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCreateUpdate() throws Exception {
factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
@@ -299,83 +330,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
checkTtl(key, 0L);
}
- public void _testPrimary() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = true;
-
- boolean inTx = true;
-
- startGrids();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- GridCache<Integer, Object> cache0 = cache(0);
-
- Integer key = primaryKey(cache0);
-
- log.info("Create: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 60_000);
-
- tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
- log.info("Update: " + key);
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 61_000);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void _test1() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = false;
-
- boolean inTx = true;
-
- startGrids();
-
- Collection<Integer> keys = keys();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- for (final Integer key : keys) {
- log.info("Test key1: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
-
- log.info("Test key2: " + key);
-
- tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
-
- log.info("Done");
- }
- }
-
/**
* @param key Key.
* @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started.
@@ -436,7 +390,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
* @return Transaction.
*/
@Nullable private GridCacheTx startTx(@Nullable GridCacheTxConcurrency txConcurrency) {
- return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, READ_COMMITTED);
+ return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, REPEATABLE_READ);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index fd2d205..7f25b3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -16,7 +16,7 @@ import junit.framework.*;
*/
public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
/**
- * @return Cache API test suite.
+ * @return Cache Expiry Policy test suite.
* @throws Exception If failed.
*/
public static TestSuite suite() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 16e5b25..bbce8ca 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -402,7 +402,8 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
UUID subjId,
Object transformClo,
String taskName,
- IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
return val;
}
[3/5] incubator-ignite git commit: # ignite-41
Posted by sb...@apache.org.
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d84da7d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d84da7d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d84da7d0
Branch: refs/heads/ignite-41
Commit: d84da7d0846036eddf8f300c56735f7c1ab1e948
Parents: 688a2e7
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 17 12:16:32 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 17 13:52:23 2014 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 40 +-
.../processors/cache/GridCacheMessage.java | 9 +
.../processors/cache/GridCacheTxEntry.java | 56 +-
.../cache/GridCacheTxLocalAdapter.java | 47 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 12 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 3 +-
.../distributed/near/GridNearLockRequest.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 65 +-
.../cache/IgniteCacheAbstractTest.java | 152 ++++
.../processors/cache/IgniteCacheTest.java | 124 ----
.../IgniteCacheAtomicExpiryPolicyTest.java | 41 ++
.../IgniteCacheAtomicLocalExpiryPolicyTest.java | 41 ++
...teCacheAtomicReplicatedExpiryPolicyTest.java | 24 +
.../IgniteCacheExpiryPolicyAbstractTest.java | 714 +++++++++++++++++++
.../expiry/IgniteCacheExpiryPolicyTest.java | 507 -------------
.../IgniteCacheExpiryPolicyTestSuite.java | 35 +
.../expiry/IgniteCacheTxExpiryPolicyTest.java | 41 ++
.../IgniteCacheTxLocalExpiryPolicyTest.java | 41 ++
...IgniteCacheTxReplicatedExpiryPolicyTest.java | 26 +
.../processors/cache/GridCacheTestEntryEx.java | 15 +-
.../testframework/junits/GridAbstractTest.java | 8 +
.../junits/common/GridCommonAbstractTest.java | 25 +-
.../bamboo/GridDataGridTestSuite.java | 2 +-
26 files changed, 1343 insertions(+), 701 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 58fd1cd..931e243 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -283,7 +283,7 @@ public class GridCacheContext<K, V> implements Externalizable {
Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
- expiryPlc = factory.create();
+ expiryPlc = factory != null ? factory.create() : null;
if (expiryPlc instanceof EternalExpiryPolicy)
expiryPlc = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index fc8aaaa..76d73b3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -447,7 +447,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param writeObj Value. Type depends on operation.
* @param writeThrough Write through flag.
* @param retval Return value flag.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy..
* @param evt Event flag.
* @param metrics Metrics update flag.
* @param filter Optional filter to check.
@@ -464,7 +464,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
@Nullable Object writeObj,
boolean writeThrough,
boolean retval,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index a50e461..7320595 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -200,7 +200,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
unmarshall(nodeId, cacheMsg);
- log.info("Message: " + cacheMsg);
+ //log.info("Message: " + cacheMsg);
if (cacheMsg.allowForStartup())
processMessage(nodeId, cacheMsg, c);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 2c377e7..879f796 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1112,7 +1112,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
@Nullable UUID subjId,
String taskName
) throws IgniteCheckedException, GridCacheEntryRemovedException {
- log.info("Inner set " + key + " " + val + " " + ttl);
+ // log.info("Inner set " + key + " " + val + " " + ttl);
V old;
@@ -1432,7 +1432,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
@Nullable Object writeObj,
boolean writeThrough,
boolean retval,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -1456,13 +1456,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (isNew())
unswap(true, retval);
- long newTtl = ttl;
-
- if (newTtl < 0)
- newTtl = ttlExtras();
-
- long newExpireTime = toExpireTime(newTtl);
-
// Possibly get old value form store.
old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
@@ -1541,11 +1534,36 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
// Must persist inside synchronization in non-tx mode.
cctx.store().putToStore(null, key, updated, ver);
+ long ttl;
+ long expireTime;
+
+ if (expiryPlc != null) {
+ if (!hadVal) {
+ Duration duration = expiryPlc.getExpiryForCreation();
+
+ if (duration != null && duration.isZero())
+ return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(old));
+
+ ttl = toTtl(duration);
+ }
+ else
+ ttl = toTtl(expiryPlc.getExpiryForUpdate());
+
+ ttl = ttl < 0 ? ttlExtras() : ttl;
+
+ expireTime = toExpireTime(ttl);
+ }
+ else {
+ ttl = ttlExtras();
+
+ expireTime = toExpireTime(ttl);
+ }
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
- updateIndex(updated, null, newExpireTime, ver, old);
+ updateIndex(updated, null, expireTime, ver, old);
- update(updated, null, newExpireTime, newTtl, ver);
+ update(updated, null, expireTime, ttl, ver);
if (evt) {
V evtOld = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
index 71eac41..882fc5a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
@@ -330,7 +330,12 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage
assert ctx != null;
if (txEntries != null) {
+ boolean transferExpiry = transferExpiryPolicy();
+
for (GridCacheTxEntry<K, V> e : txEntries) {
+ if (transferExpiry)
+ e.transferExpiryPolicyIfNeeded();
+
e.marshal(ctx);
if (ctx.deploymentEnabled()) {
@@ -342,6 +347,10 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage
}
}
+ protected boolean transferExpiryPolicy() {
+ return false;
+ }
+
/**
* @param txEntries Entries to unmarshal.
* @param ctx Context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
index 91b9cc0..df888e7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
@@ -13,12 +13,14 @@ import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.optimized.*;
import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -128,6 +130,12 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
/** Data center replication version. */
private GridCacheVersion drVer;
+ /** Expiry policy. */
+ private ExpiryPolicy expiryPlc;
+
+ /** */
+ private boolean transferExpiryPlc;
+
/**
* Required by {@link Externalizable}
*/
@@ -147,8 +155,14 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
* @param entry Cache entry.
* @param drVer Data center replication version.
*/
- public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op, V val,
- long ttl, long drExpireTime, GridCacheEntryEx<K, V> entry, @Nullable GridCacheVersion drVer) {
+ public GridCacheTxEntry(GridCacheContext<K, V> ctx,
+ GridCacheTxEx<K, V> tx,
+ GridCacheOperation op,
+ V val,
+ long ttl,
+ long drExpireTime,
+ GridCacheEntryEx<K, V> entry,
+ @Nullable GridCacheVersion drVer) {
assert ctx != null;
assert tx != null;
assert op != null;
@@ -183,9 +197,15 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
* @param filters Put filters.
* @param drVer Data center replication version.
*/
- public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op,
- V val, IgniteClosure<V, V> transformClos, long ttl, GridCacheEntryEx<K,V> entry,
- IgnitePredicate<GridCacheEntry<K, V>>[] filters, GridCacheVersion drVer) {
+ public GridCacheTxEntry(GridCacheContext<K, V> ctx,
+ GridCacheTxEx<K, V> tx,
+ GridCacheOperation op,
+ V val,
+ IgniteClosure<V, V> transformClos,
+ long ttl,
+ GridCacheEntryEx<K,V> entry,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filters,
+ GridCacheVersion drVer) {
assert ctx != null;
assert tx != null;
assert op != null;
@@ -285,6 +305,7 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
cp.grpLock = grpLock;
cp.depEnabled = depEnabled;
cp.drVer = drVer;
+ cp.expiryPlc = expiryPlc;
return cp;
}
@@ -708,6 +729,13 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
}
/**
+ * Marks expiry policy for transfer if it explicitly set and differs from default one.
+ */
+ public void transferExpiryPolicyIfNeeded() {
+ transferExpiryPlc = expiryPlc != null && expiryPlc != ctx.expiry();
+ }
+
+ /**
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
@@ -768,6 +796,20 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
val.unmarshal(this.ctx, clsLdr, depEnabled);
}
+ /**
+ * @param expiryPlc Expiry policy.
+ */
+ public void expiry(@Nullable ExpiryPolicy expiryPlc) {
+ this.expiryPlc = expiryPlc;
+ }
+
+ /**
+ * @return Expiry policy.
+ */
+ @Nullable public ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(depEnabled);
@@ -793,6 +835,8 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
CU.writeVersion(out, explicitVer);
out.writeBoolean(grpLock);
CU.writeVersion(out, drVer);
+
+ out.writeObject(transferExpiryPlc ? new GridCacheExpiryPolicy(expiryPlc) : null);
}
/** {@inheritDoc} */
@@ -821,6 +865,8 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
explicitVer = CU.readVersion(in);
grpLock = in.readBoolean();
drVer = CU.readVersion(in);
+
+ expiryPlc = (ExpiryPolicy)in.readObject();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index c6888f1..7b6f266 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -653,7 +653,10 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
byte[] valBytes = res.get3();
if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) {
- ExpiryPolicy expiry = cacheCtx.expiry();
+ ExpiryPolicy expiry = txEntry.expiry();
+
+ if (expiry == null)
+ expiry = cacheCtx.expiry();
if (expiry != null) {
Duration duration = cached.hasValue() ?
@@ -661,7 +664,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
txEntry.ttl(GridCacheMapEntry.toTtl(duration));
- log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", detached=" + cached.detached());
+ log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", plc=" + expiry);
}
}
@@ -2684,7 +2687,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
@Nullable V val,
@Nullable IgniteClosure<V, V> transformClos,
GridCacheEntryEx<K, V> entry,
- @Nullable ExpiryPolicy expiryPlc, // TODO IGNITE-41
+ @Nullable ExpiryPolicy expiryPlc,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
boolean filtersSet,
long drTtl,
@@ -2739,7 +2742,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
entryTtlDr(key, drTtl, drExpireTime);
}
else
- entryTtl(key, ttl);
+ entryExpiry(key, expiryPlc);
txEntry = old;
@@ -2747,15 +2750,23 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
log.debug("Updated transaction entry: " + txEntry);
}
else {
- long ttl = -1L;
-
- if (drTtl >= 0L)
- ttl = drTtl;
-
- txEntry = new GridCacheTxEntry<>(entry.context(), this, op, val, transformClos, ttl, entry, filter, drVer);
+ boolean hasDrTtl = drTtl >= 0;
+
+ txEntry = new GridCacheTxEntry<>(entry.context(),
+ this,
+ op,
+ val,
+ transformClos,
+ hasDrTtl ? drTtl : -1L,
+ entry,
+ filter,
+ drVer);
txEntry.drExpireTime(drExpireTime);
+ if (!hasDrTtl)
+ txEntry.expiry(expiryPlc);
+
txMap.put(key, txEntry);
if (log.isDebugEnabled())
@@ -2844,6 +2855,19 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
/**
* @param key Key.
+ * @param expiryPlc Expiry policy.
+ */
+ void entryExpiry(GridCacheTxKey<K> key, @Nullable ExpiryPolicy expiryPlc) {
+ assert key != null;
+
+ GridCacheTxEntry<K, V> e = entry(key);
+
+ if (e != null)
+ e.expiry(expiryPlc);
+ }
+
+ /**
+ * @param key Key.
* @param ttl TTL.
* @param expireTime Expire time.
* @return {@code true} if tx entry exists for this key, {@code false} otherwise.
@@ -2856,7 +2880,10 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
if (e != null) {
e.ttl(ttl);
+
e.drExpireTime(expireTime);
+
+ e.expiry(null);
}
return e != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3f30801..7cee7d9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -692,8 +692,16 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
GridDhtLockFuture<K, V> fut = null;
if (!req.inTx()) {
- fut = new GridDhtLockFuture<>(ctx, nearNode.id(), req.version(),
- req.topologyVersion(), cnt, req.txRead(), req.timeout(), tx, req.threadId(), filter);
+ fut = new GridDhtLockFuture<>(ctx,
+ nearNode.id(),
+ req.version(),
+ req.topologyVersion(),
+ cnt,
+ req.txRead(),
+ req.timeout(),
+ tx,
+ req.threadId(),
+ filter);
// Add before mapping.
if (!ctx.mvcc().addFuture(fut))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 461ea04..44c397a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -512,7 +512,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
GridCacheTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++);
- txEntry = addEntry(NOOP, null, null, cached, -1, CU.<K, V>empty(), false, -1L, -1L,
+ txEntry = addEntry(NOOP, null, null, cached, null, CU.<K, V>empty(), false, -1L, -1L,
drVers != null ? drVers[drVerIdx++] : null);
if (w != null) {
@@ -526,6 +526,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
txEntry.ttl(w.ttl());
txEntry.filters(w.filters());
txEntry.drExpireTime(w.drExpireTime());
+ txEntry.expiry(w.expiry());
}
txEntry.cached(cached, txEntry.keyBytes());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
index abcc189..4a1d501 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -284,8 +284,12 @@ public class GridNearLockRequest<K, V> extends GridDistributedLockRequest<K, V>
return dhtVers[idx];
}
- /** {@inheritDoc}
- * @param ctx*/
+ /** {@inheritDoc} */
+ @Override protected boolean transferExpiryPolicy() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e4d6b00..433d199 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import sun.misc.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -103,7 +104,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
- ttl,
+ expiryPerCall(),
true,
false,
filter,
@@ -124,7 +125,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
- ttl,
+ expiryPerCall(),
false,
false,
filter,
@@ -142,7 +143,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
- -1,
+ expiryPerCall(),
false,
false,
filter,
@@ -239,7 +240,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(newVal),
- 0,
+ expiryPerCall(),
true,
true,
ctx.equalsPeekArray(oldVal),
@@ -256,7 +257,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- 0,
+ null,
true,
true,
ctx.equalsPeekArray(val),
@@ -292,7 +293,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
- 0,
+ expiryPerCall(),
false,
false,
filter,
@@ -314,7 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(TRANSFORM,
Collections.singleton(key),
Collections.singleton(transformer),
- -1,
+ expiryPerCall(),
false,
false,
null,
@@ -328,7 +329,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (R)updateAllInternal(TRANSFORM,
Collections.singleton(key),
Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)),
- -1,
+ expiryPerCall(),
true,
false,
null,
@@ -356,7 +357,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(TRANSFORM,
m.keySet(),
m.values(),
- 0,
+ expiryPerCall(),
false,
false,
null,
@@ -383,7 +384,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- 0,
+ null,
true,
false,
filter,
@@ -409,7 +410,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(DELETE,
keys,
null,
- 0,
+ null,
false,
false,
filter,
@@ -436,7 +437,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- 0,
+ null,
false,
false,
filter,
@@ -464,7 +465,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- 0,
+ null,
false,
false,
ctx.equalsPeekArray(val),
@@ -678,13 +679,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
final boolean storeEnabled = ctx.isStoreEnabled();
+ final ExpiryPolicy expiry = expiryPerCall();
return asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
keys,
vals,
- ttl,
+ expiry,
retval,
rawRetval,
filter,
@@ -715,7 +717,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return updateAllInternal(DELETE,
keys,
null,
- 0,
+ null,
retval,
rawRetval,
filter,
@@ -730,7 +732,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
* @param op Operation.
* @param keys Keys.
* @param vals Values.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param filter Cache entry filter.
@@ -742,7 +744,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
private Object updateAllInternal(GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean retval,
boolean rawRetval,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -762,7 +764,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
UUID subjId = ctx.subjectIdPerCall(null);
if (storeEnabled && keys.size() > 1) {
- updateWithBatch(op, keys, vals, ver, filter, subjId, taskName);
+ updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName);
return null;
}
@@ -793,7 +795,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
val,
storeEnabled,
retval,
- ttl,
+ expiryPlc,
true,
true,
filter,
@@ -860,6 +862,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
+ @Nullable ExpiryPolicy expiryPlc,
GridCacheVersion ver,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
UUID subjId,
@@ -941,6 +944,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ver,
putMap,
null,
+ expiryPlc,
err,
subjId,
taskName);
@@ -971,6 +975,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ver,
null,
rmvKeys,
+ expiryPlc,
err,
subjId,
taskName);
@@ -1067,6 +1072,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ver,
putMap,
rmvKeys,
+ expiryPlc,
err,
subjId,
taskName);
@@ -1087,16 +1093,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
* @param ver Cache version.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
+ * @param expiryPlc Expiry policy.
* @param err Optional partial update exception.
* @param subjId Subject ID.
* @param taskName Task name.
* @return Partial update exception.
*/
@SuppressWarnings({"unchecked", "ConstantConditions", "ForLoopReplaceableByForEach"})
- @Nullable private GridCachePartialUpdateException updatePartialBatch(List<GridCacheEntryEx<K, V>> entries,
+ @Nullable private GridCachePartialUpdateException updatePartialBatch(
+ List<GridCacheEntryEx<K, V>> entries,
final GridCacheVersion ver,
@Nullable Map<K, V> putMap,
@Nullable Collection<K> rmvKeys,
+ @Nullable ExpiryPolicy expiryPlc,
@Nullable GridCachePartialUpdateException err,
UUID subjId,
String taskName
@@ -1151,7 +1160,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
writeVal,
false,
false,
- 0,
+ expiryPlc,
true,
true,
null,
@@ -1275,6 +1284,20 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/**
+ * @return Expiry policy.
+ */
+ @Nullable private ExpiryPolicy expiryPerCall() {
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+ ExpiryPolicy expiry = prj != null ? prj.expiry() : null;
+
+ if (expiry == null)
+ expiry = ctx.expiry();
+
+ return expiry;
+ }
+
+ /**
* @param op Operation closure.
* @return Future.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
new file mode 100644
index 0000000..9293431
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -0,0 +1,152 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.testframework.junits.common.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ * Abstract class for cache tests.
+ */
+public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * @return Grids count to start.
+ */
+ protected abstract int gridCount();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected void startGrids() throws Exception {
+ int cnt = gridCount();
+
+ assert cnt >= 1 : "At least one grid must be started";
+
+ startGridsMultiThreaded(cnt);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
+
+ disco.setIpFinder(ipFinder);
+
+ if (isDebug())
+ disco.setAckTimeout(Integer.MAX_VALUE);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ * @throws Exception In case of error.
+ */
+ protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setSwapEnabled(swapEnabled());
+ cfg.setCacheMode(cacheMode());
+ cfg.setAtomicityMode(atomicityMode());
+ cfg.setWriteSynchronizationMode(writeSynchronization());
+ cfg.setDistributionMode(distributionMode());
+ cfg.setPortableEnabled(portableEnabled());
+
+ if (cacheMode() == PARTITIONED)
+ cfg.setBackups(1);
+
+ return cfg;
+ }
+
+ /**
+ * @return Default cache mode.
+ */
+ protected abstract GridCacheMode cacheMode();
+
+ /**
+ * @return Cache atomicity mode.
+ */
+ protected abstract GridCacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Partitioned mode.
+ */
+ protected abstract GridCacheDistributionMode distributionMode();
+
+ /**
+ * @return Write synchronization.
+ */
+ protected GridCacheWriteSynchronizationMode writeSynchronization() {
+ return FULL_SYNC;
+ }
+
+ /**
+ * @return Whether portable mode is enabled.
+ */
+ protected boolean portableEnabled() {
+ return false;
+ }
+
+ /**
+ * @return {@code true} if swap should be enabled.
+ */
+ protected boolean swapEnabled() {
+ return false;
+ }
+
+ /**
+ * @return Cache.
+ */
+ protected <K, V> IgniteCache<K, V> jcache() {
+ return jcache(0);
+ }
+
+ /**
+ * @param idx Grid index.
+ * @return Cache.
+ */
+ protected <K, V> IgniteCache<K, V> jcache(int idx) {
+ return grid(idx).jcache(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
deleted file mode 100644
index ef0b4c3..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.testframework.junits.common.*;
-
-/**
- *
- */
-public class IgniteCacheTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /**
- * @return Grids count to start.
- */
- protected int gridCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- startGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- protected void startGrids() throws Exception {
- int cnt = gridCount();
-
- assert cnt >= 1 : "At least one grid must be started";
-
- startGridsMultiThreaded(cnt);
-
- awaitPartitionMapExchange();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setMaxMissedHeartbeats(Integer.MAX_VALUE);
-
- disco.setIpFinder(ipFinder);
-
- if (isDebug())
- disco.setAckTimeout(Integer.MAX_VALUE);
-
- cfg.setDiscoverySpi(disco);
-
- cfg.setCacheConfiguration(cacheConfiguration(gridName));
-
- cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
-
- return cfg;
- }
-
- /**
- * @param gridName Grid name.
- * @return Cache configuration.
- * @throws Exception In case of error.
- */
- protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
- GridCacheConfiguration cfg = defaultCacheConfiguration();
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPutGetRemove() throws Exception {
- IgniteCache<Integer, String> cache = jcache();
-
- for (int i = 0; i < 10; i++)
- cache.put(i, String.valueOf(i));
-
- for (int i = 0; i < 10; i++)
- assertEquals(String.valueOf(i), cache.get(i));
-
- for (int i = 0; i < 10; i++)
- cache.remove(i);
-
- for (int i = 0; i < 10; i++)
- assertNull(cache.get(i));
- }
-
- /**
- * @return Cache.
- */
- protected <K, V> IgniteCache<K, V> jcache() {
- return jcache(0);
- }
-
- /**
- * @param idx Grid index.
- * @return Cache.
- */
- protected <K, V> IgniteCache<K, V> jcache(int idx) {
- return grid(idx).jcache(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
new file mode 100644
index 0000000..251bc05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java
new file mode 100644
index 0000000..20c9666
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicLocalExpiryPolicyTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java
new file mode 100644
index 0000000..bca7705
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicReplicatedExpiryPolicyTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicReplicatedExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
new file mode 100644
index 0000000..c8abd0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -0,0 +1,714 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
+import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
+
+/**
+ *
+ */
+public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbstractTest {
+ /** */
+ private Factory<? extends ExpiryPolicy> factory;
+
+ /** */
+ private boolean nearCache;
+
+ /** */
+ private Integer lastKey = 0;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEternal() throws Exception {
+ factory = EternalExpiryPolicy.factoryOf();
+
+ startGrids();
+
+ for (final Integer key : keys()) {
+ log.info("Test eternalPolicy, key: " + key);
+
+ eternal(key);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNullFactory() throws Exception {
+ factory = null;
+
+ startGrids();
+
+ for (final Integer key : keys()) {
+ log.info("Test eternalPolicy, key: " + key);
+
+ eternal(key);
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void eternal(Integer key) throws Exception {
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ cache.put(key, 1); // Create.
+
+ checkTtl(key, 0);
+
+ assertEquals((Integer) 1, cache.get(key)); // Get.
+
+ checkTtl(key, 0);
+
+ cache.put(key, 2); // Update.
+
+ checkTtl(key, 0);
+
+ assertTrue(cache.remove(key)); // Remove.
+
+ /*
+ cache.withExpiryPolicy(new TestPolicy(60_000L, null, null)).put(key, 1); // Create with custom.
+
+ checkTtl(key, 60_000L);
+
+ cache.put(key, 2); // Update.
+
+ checkTtl(key, 0);
+
+ cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 1);
+
+ checkTtl(key, 1000L);
+
+ waitExpired(key);
+ */
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCreateUpdate() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ startGrids();
+
+ for (final Integer key : keys()) {
+ log.info("Test createUpdate [key=" + key + ']');
+
+ createUpdate(key, null);
+ }
+
+ for (final Integer key : keys()) {
+ log.info("Test createUpdateCustomPolicy [key=" + key + ']');
+
+ createUpdateCustomPolicy(key, null);
+ }
+
+ createUpdatePutAll(null);
+
+ GridCacheTxConcurrency[] txModes = {PESSIMISTIC};
+
+ if (atomicityMode() == TRANSACTIONAL) {
+ for (GridCacheTxConcurrency tx : txModes) {
+ for (final Integer key : keys()) {
+ log.info("Test createUpdate [key=" + key + ", tx=" + tx + ']');
+
+ createUpdate(key, tx);
+ }
+
+ for (final Integer key : keys()) {
+ log.info("Test createUpdateCustomPolicy [key=" + key + ", tx=" + tx + ']');
+
+ createUpdateCustomPolicy(key, tx);
+ }
+
+ createUpdatePutAll(tx);
+ }
+ }
+ }
+
+ /**
+ * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started.
+ * @throws Exception If failed.
+ */
+ private void createUpdatePutAll(@Nullable GridCacheTxConcurrency txConcurrency) throws Exception {
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (int i = 0; i < 1000; i++)
+ vals.put(i, i);
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ cache.removeAll(vals.keySet());
+
+ GridCacheTx tx = startTx(txConcurrency);
+
+ // Create.
+ cache.putAll(vals);
+
+ if (tx != null)
+ tx.commit();
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 60_000);
+
+ tx = startTx(txConcurrency);
+
+ // Update.
+ cache.putAll(vals);
+
+ if (tx != null)
+ tx.commit();
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 61_000);
+
+ tx = startTx(txConcurrency);
+
+ // Update with provided TTL.
+ cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+
+ if (tx != null)
+ tx.commit();
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1000L);
+
+ waitExpired(vals.keySet());
+
+ tx = startTx(txConcurrency);
+
+ // Try create again.
+ cache.putAll(vals);
+
+ if (tx != null)
+ tx.commit();
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 60_000L);
+
+ Map<Integer, Integer> newVals = new HashMap<>(vals);
+
+ newVals.put(100_000, 1);
+
+ // Updates and create.
+ cache.putAll(newVals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 61_000L);
+
+ checkTtl(100_000, 60_000L);
+
+ cache.removeAll(newVals.keySet());
+ }
+
+ /**
+ * @param key Key.
+ * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started.
+ * @throws Exception If failed.
+ */
+ private void createUpdateCustomPolicy(Integer key, @Nullable GridCacheTxConcurrency txConcurrency)
+ throws Exception {
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ assertNull(cache.get(key));
+
+ GridCacheTx tx = startTx(txConcurrency);
+
+ cache.withExpiryPolicy(new TestPolicy(10_000L, 20_000L, 30_000L)).put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 10_000L);
+
+ for (int idx = 0; idx < gridCount(); idx++) {
+ assertEquals(1, cache(idx).get(key)); // Try get.
+
+ checkTtl(key, 10_000);
+ }
+
+ tx = startTx(txConcurrency);
+
+ // Update, returns null duration, should not change TTL.
+ cache.withExpiryPolicy(new TestPolicy(20_000L, null, null)).put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 10_000L);
+
+ tx = startTx(txConcurrency);
+
+ // Update with provided TTL.
+ cache.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 1000L);
+
+ waitExpired(key);
+
+ tx = startTx(txConcurrency);
+
+ // Create, returns null duration, should create with 0 TTL.
+ cache.withExpiryPolicy(new TestPolicy(null, 20_000L, 30_000L)).put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 0L);
+ }
+
+ public void _testPrimary() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = true;
+
+ boolean inTx = true;
+
+ startGrids();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ GridCache<Integer, Object> cache0 = cache(0);
+
+ Integer key = primaryKey(cache0);
+
+ log.info("Create: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 60_000);
+
+ tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
+
+ log.info("Update: " + key);
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 61_000);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _test1() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = false;
+
+ boolean inTx = true;
+
+ startGrids();
+
+ Collection<Integer> keys = keys();
+
+ IgniteCache<Integer, Integer> cache = jcache(0);
+
+ for (final Integer key : keys) {
+ log.info("Test key1: " + key);
+
+ GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
+
+ cache.put(key, 1);
+
+ if (tx != null)
+ tx.commit();
+
+ log.info("Test key2: " + key);
+
+ tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
+
+ cache.put(key, 2);
+
+ if (tx != null)
+ tx.commit();
+
+ log.info("Done");
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started.
+ * @throws Exception If failed.
+ */
+ private void createUpdate(Integer key, @Nullable GridCacheTxConcurrency txConcurrency)
+ throws Exception {
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ // Run several times to make sure create after remove works as expected.
+ for (int i = 0; i < 3; i++) {
+ log.info("Iteration: " + i);
+
+ GridCacheTx tx = startTx(txConcurrency);
+
+ cache.put(key, 1); // Create.
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 60_000);
+
+ for (int idx = 0; idx < gridCount(); idx++) {
+ assertEquals(1, cache(idx).get(key)); // Try get.
+
+ checkTtl(key, 60_000);
+ }
+
+ tx = startTx(txConcurrency);
+
+ cache.put(key, 2); // Update.
+
+ if (tx != null)
+ tx.commit();
+
+ checkTtl(key, 61_000);
+
+ for (int idx = 0; idx < gridCount(); idx++) {
+ assertEquals(2, cache(idx).get(key)); // Try get.
+
+ checkTtl(key, 61_000);
+ }
+
+ tx = startTx(txConcurrency);
+
+ assertTrue(cache.remove(key));
+
+ if (tx != null)
+ tx.commit();
+
+ for (int idx = 0; idx < gridCount(); idx++)
+ assertNull(cache(idx).get(key));
+ }
+ }
+
+ /**
+ * @param txConcurrency Transaction concurrency mode.
+ * @return Transaction.
+ */
+ @Nullable private GridCacheTx startTx(@Nullable GridCacheTxConcurrency txConcurrency) {
+ return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, READ_COMMITTED);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCreateUpdate() throws Exception {
+ if (cacheMode() != PARTITIONED)
+ return;
+
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = true;
+
+ startGrids();
+
+ Integer key = nearKey(cache(0));
+
+ IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+ jcache0.put(key, 1);
+
+ checkTtl(key, 60_000);
+
+ IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+ // Update from another node.
+ jcache1.put(key, 2);
+
+ checkTtl(key, 61_000L);
+
+ // Update from another node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
+
+ checkTtl(key, 1000);
+
+ waitExpired(key);
+
+ // Try create again.
+ jcache0.put(key, 1);
+
+ checkTtl(key, 60_000);
+
+ // Update from near node with provided TTL.
+ jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+
+ checkTtl(key, 1100);
+
+ waitExpired(key);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearPutAll() throws Exception {
+ if (cacheMode() != PARTITIONED)
+ return;
+
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+ nearCache = true;
+
+ startGrids();
+
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ for (int i = 0; i < 1000; i++)
+ vals.put(i, i);
+
+ IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+ jcache0.putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 60_000);
+
+ IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+ // Update from another node.
+ jcache1.putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 61_000);
+
+ // Update from another node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1000);
+
+ waitExpired(vals.keySet());
+
+ // Try create again.
+ jcache0.putAll(vals);
+
+ // Update from near node with provided TTL.
+ jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
+
+ for (Integer key : vals.keySet())
+ checkTtl(key, 1101L);
+
+ waitExpired(vals.keySet());
+ }
+
+ /**
+ * @return Test keys.
+ * @throws Exception If failed.
+ */
+ private Collection<Integer> keys() throws Exception {
+ GridCache<Integer, Object> cache = cache(0);
+
+ ArrayList<Integer> keys = new ArrayList<>();
+
+ keys.add(primaryKeys(cache, 1, lastKey).get(0));
+
+ if (gridCount() > 1) {
+ keys.add(backupKeys(cache, 1, lastKey).get(0));
+
+ if (cache.configuration().getCacheMode() != REPLICATED)
+ keys.add(nearKeys(cache, 1, lastKey).get(0));
+ }
+
+ lastKey = Collections.max(keys) + 1;
+
+ return keys;
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void waitExpired(Integer key) throws Exception {
+ waitExpired(Collections.singleton(key));
+ }
+
+ /**
+ * @param keys Keys.
+ * @throws Exception If failed.
+ */
+ private void waitExpired(final Collection<Integer> keys) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys) {
+ Object val = jcache(i).localPeek(key);
+
+ if (val != null) {
+ // log.info("Value [grid=" + i + ", val=" + val + ']');
+
+ return false;
+ }
+ }
+ }
+
+ return false;
+ }
+ }, 3000);
+
+ GridCache<Integer, Object> cache = cache(0);
+
+ for (int i = 0; i < gridCount(); i++) {
+ ClusterNode node = grid(i).cluster().localNode();
+
+ for (Integer key : keys) {
+ Object val = jcache(i).localPeek(key);
+
+ if (val != null) {
+ log.info("Unexpected value [grid=" + i +
+ ", primary=" + cache.affinity().isPrimary(node, key) +
+ ", backup=" + cache.affinity().isBackup(node, key) + ']');
+ }
+
+ assertNull("Unexpected non-null value for grid " + i, val);
+ }
+ }
+
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param ttl TTL.
+ * @throws Exception If failed.
+ */
+ private void checkTtl(Object key, long ttl) throws Exception {
+ boolean found = false;
+
+ for (int i = 0; i < gridCount(); i++) {
+ GridKernal grid = (GridKernal)grid(i);
+
+ GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
+
+ GridCacheEntryEx<Object, Object> e = cache.peekEx(key);
+
+ if (e == null && cache.context().isNear())
+ e = cache.context().near().dht().peekEx(key);
+
+ if (e == null)
+ assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ else {
+ found = true;
+
+ assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
+
+ if (ttl > 0)
+ assertTrue(e.expireTime() > 0);
+ else
+ assertEquals(0, e.expireTime());
+ }
+ }
+
+ assertTrue(found);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ if (nearCache && gridName.equals(getTestGridName(0)))
+ cfg.setDistributionMode(NEAR_PARTITIONED);
+
+ cfg.setExpiryPolicyFactory(factory);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private class TestPolicy implements ExpiryPolicy {
+ /** */
+ private Long create;
+
+ /** */
+ private Long access;
+
+ /** */
+ private Long update;
+
+ /**
+ * @param create TTL for creation.
+ * @param access TTL for access.
+ * @param update TTL for update.
+ */
+ TestPolicy(@Nullable Long create,
+ @Nullable Long update,
+ @Nullable Long access) {
+ this.create = create;
+ this.update = update;
+ this.access = access;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestPolicy.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
deleted file mode 100644
index f96e5a7..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.apache.ignite.internal.processors.cache.expiry;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.testframework.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.configuration.*;
-import javax.cache.expiry.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
-import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- *
- */
-public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
- /** */
- private Factory<? extends ExpiryPolicy> factory;
-
- /** */
- private boolean nearCache;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
- }
-
- public void testPrimary() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = false;
-
- boolean inTx = false;
-
- startGrids();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- GridCache<Integer, Object> cache0 = cache(0);
-
- Integer key = primaryKey(cache0);
-
- log.info("Create: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 60_000);
-
- tx = inTx ? grid(0).transactions().txStart() : null;
-
- log.info("Update: " + key);
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 61_000);
- }
-
- public void testBackup() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = false;
-
- boolean inTx = false;
-
- startGrids();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- GridCache<Integer, Object> cache0 = cache(0);
-
- Integer key = backupKey(cache0);
-
- log.info("Create: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 60_000);
-
- tx = inTx ? grid(0).transactions().txStart() : null;
-
- log.info("Update: " + key);
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 61_000);
- }
-
- public void testNear() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = false;
-
- boolean inTx = true;
-
- startGrids();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- GridCache<Integer, Object> cache0 = cache(0);
-
- Integer key = nearKey(cache0);
-
- log.info("Create: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 60_000);
-
- tx = inTx ? grid(0).transactions().txStart() : null;
-
- log.info("Update: " + key);
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
-
- checkTtl(key, 61_000);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void test1() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
-
- nearCache = false;
-
- boolean inTx = true;
-
- startGrids();
-
- Collection<Integer> keys = keys();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- for (final Integer key : keys) {
- log.info("Test key1: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
-
- cache.put(key, 1);
-
- if (tx != null)
- tx.commit();
- }
-
- for (final Integer key : keys) {
- log.info("Test key2: " + key);
-
- GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
-
- cache.put(key, 2);
-
- if (tx != null)
- tx.commit();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCreated() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
-
- startGrids();
-
- Collection<Integer> keys = keys();
-
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- for (final Integer key : keys) {
- log.info("Test key: " + key);
-
- cache.put(key, 1);
-
- checkTtl(key, 60_000);
-
- for (int i = 0; i < gridCount(); i++) {
- assertEquals((Integer)1, cache.get(key));
-
- checkTtl(key, 60_000);
- }
-
- cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 2); // Update, should not change TTL.
-
- checkTtl(key, 60_000);
-
- assertEquals((Integer)2, cache.get(key));
-
- assertTrue(cache.remove(key));
-
- cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 3); // Create with provided TTL.
-
- checkTtl(key, 1000);
-
- waitExpired(key);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNearPut() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
-
- nearCache = true;
-
- startGrids();
-
- GridCache<Integer, Object> cache0 = cache(0);
-
- Integer key = nearKey(cache0);
-
- IgniteCache<Integer, Integer> jcache0 = jcache(0);
-
- jcache0.put(key, 1);
-
- checkTtl(key, 60_000);
-
- IgniteCache<Integer, Integer> jcache1 = jcache(1);
-
- // Update from another node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2);
-
- checkTtl(key, 1000);
-
- waitExpired(key);
-
- jcache1.remove(key);
-
- jcache0.put(key, 1);
-
- checkTtl(key, 60_000);
-
- // Update from near node with provided TTL.
- jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
-
- checkTtl(key, 1100);
-
- waitExpired(key);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNearPutAll() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
-
- nearCache = true;
-
- startGrids();
-
- Map<Integer, Integer> vals = new HashMap<>();
-
- for (int i = 0; i < 1000; i++)
- vals.put(i, i);
-
- IgniteCache<Integer, Integer> jcache0 = jcache(0);
-
- jcache0.putAll(vals);
-
- for (Integer key : vals.keySet())
- checkTtl(key, 60_000);
-
- IgniteCache<Integer, Integer> jcache1 = jcache(1);
-
- // Update from another node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
-
- for (Integer key : vals.keySet())
- checkTtl(key, 1000);
-
- waitExpired(vals.keySet());
-
- jcache0.removeAll(vals.keySet());
-
- jcache0.putAll(vals);
-
- // Update from near node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
-
- for (Integer key : vals.keySet())
- checkTtl(key, 1101L);
-
- waitExpired(vals.keySet());
- }
-
- /**
- * @return Test keys.
- * @throws Exception If failed.
- */
- private Collection<Integer> keys() throws Exception {
- GridCache<Integer, Object> cache = cache(0);
-
- Collection<Integer> keys = new ArrayList<>();
-
- keys.add(primaryKey(cache));
-
- if (gridCount() > 1) {
- keys.add(backupKey(cache));
-
- if (cache.configuration().getCacheMode() != REPLICATED)
- keys.add(nearKey(cache));
- }
-
- return keys;
- }
-
- /**
- * @param key Key.
- * @throws Exception If failed.
- */
- private void waitExpired(Integer key) throws Exception {
- waitExpired(Collections.singleton(key));
- }
-
- /**
- * @param keys Keys.
- * @throws Exception If failed.
- */
- private void waitExpired(final Collection<Integer> keys) throws Exception {
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- for (int i = 0; i < gridCount(); i++) {
- for (Integer key : keys) {
- Object val = jcache(i).localPeek(key);
-
- if (val != null) {
- // log.info("Value [grid=" + i + ", val=" + val + ']');
-
- return false;
- }
- }
- }
-
- return false;
- }
- }, 3000);
-
- GridCache<Integer, Object> cache = cache(0);
-
- for (int i = 0; i < gridCount(); i++) {
- ClusterNode node = grid(i).cluster().localNode();
-
- for (Integer key : keys) {
- Object val = jcache(i).localPeek(key);
-
- if (val != null) {
- log.info("Unexpected value [grid=" + i +
- ", primary=" + cache.affinity().isPrimary(node, key) +
- ", backup=" + cache.affinity().isBackup(node, key) + ']');
- }
-
- assertNull("Unexpected non-null value for grid " + i, val);
- }
- }
-
- for (int i = 0; i < gridCount(); i++) {
- for (Integer key : keys)
- assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
- }
- }
-
- /**
- * @param key Key.
- * @param ttl TTL.
- * @throws Exception If failed.
- */
- private void checkTtl(Object key, long ttl) throws Exception {
- for (int i = 0; i < gridCount(); i++) {
- GridKernal grid = (GridKernal)grid(i);
-
- GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
-
- GridCacheEntryEx<Object, Object> e = cache.peekEx(key);
-
- if (e == null && cache.context().isNear())
- e = cache.context().near().dht().peekEx(key);
-
- if (e == null)
- assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
- else
- assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
- }
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
- assert factory != null;
-
- GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
-
- cfg.setCacheMode(PARTITIONED);
- cfg.setAtomicityMode(TRANSACTIONAL);
-
- //cfg.setAtomicityMode(ATOMIC);
-
- cfg.setBackups(1);
-
- if (nearCache && gridName.equals(getTestGridName(0)))
- cfg.setDistributionMode(NEAR_PARTITIONED);
- else
- cfg.setDistributionMode(PARTITIONED_ONLY);
-
- cfg.setExpiryPolicyFactory(factory);
-
- return cfg;
- }
-
- /**
- *
- */
- private class TestPolicy implements ExpiryPolicy {
- /** */
- private Long create;
-
- /** */
- private Long access;
-
- /** */
- private Long update;
-
- /**
- * @param create TTL for creation.
- * @param access TTL for access.
- * @param update TTL for update.
- */
- TestPolicy(@Nullable Long create,
- @Nullable Long update,
- @Nullable Long access) {
- this.create = create;
- this.update = update;
- this.access = access;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForCreation() {
- return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForAccess() {
- return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForUpdate() {
- return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TestPolicy.class, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
new file mode 100644
index 0000000..fd2d205
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -0,0 +1,35 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import junit.framework.*;
+
+/**
+ *
+ */
+public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
+ /**
+ * @return Cache API test suite.
+ * @throws Exception If failed.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Cache Expiry Policy Test Suite");
+
+ suite.addTestSuite(IgniteCacheAtomicLocalExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheAtomicReplicatedExpiryPolicyTest.class);
+
+ suite.addTestSuite(IgniteCacheTxLocalExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheTxExpiryPolicyTest.class);
+ suite.addTestSuite(IgniteCacheTxReplicatedExpiryPolicyTest.class);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java
new file mode 100644
index 0000000..0abe27d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxExpiryPolicyTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
[2/5] incubator-ignite git commit: # ignite-41
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalExpiryPolicyTest.java
new file mode 100644
index 0000000..91aa263
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxLocalExpiryPolicyTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxLocalExpiryPolicyTest extends IgniteCacheExpiryPolicyAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return LOCAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode distributionMode() {
+ return PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedExpiryPolicyTest.java
new file mode 100644
index 0000000..b637b59
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTxReplicatedExpiryPolicyTest.java
@@ -0,0 +1,26 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxReplicatedExpiryPolicyTest extends IgniteCacheTxExpiryPolicyTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index dac28ee..16e5b25 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -421,9 +421,18 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(GridCacheVersion ver, GridCacheOperation op,
- @Nullable Object writeObj, boolean writeThrough, boolean retval, long ttl, boolean evt, boolean metrics,
- @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean intercept, UUID subjId, String taskName)
+ @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(GridCacheVersion ver,
+ GridCacheOperation op,
+ @Nullable Object writeObj,
+ boolean writeThrough,
+ boolean retval,
+ ExpiryPolicy expiryPlc,
+ boolean evt,
+ boolean metrics,
+ @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ boolean intercept,
+ UUID subjId,
+ String taskName)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return new IgniteBiTuple<>(false, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
index 570bc77..3c56237 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
@@ -794,6 +794,14 @@ public abstract class GridAbstractTest extends TestCase {
}
/**
+ * @param idx Index.
+ * @return Ignite instance.
+ */
+ protected Ignite ignite(int idx) {
+ return G.ignite(getTestGridName(idx));
+ }
+
+ /**
* Gets grid for given test.
*
* @return Grid for given test.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
index 095d05a..8e858da 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
@@ -15,7 +15,6 @@ import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.kernal.*;
@@ -278,7 +277,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is primary.
* @throws IgniteCheckedException If failed.
*/
- protected Integer primaryKey(GridCacheProjection<Integer, ?> cache)
+ protected Integer primaryKey(GridCacheProjection<?, ?> cache)
throws IgniteCheckedException {
return primaryKeys(cache, 1, 1).get(0);
}
@@ -289,7 +288,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is primary.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> primaryKeys(GridCacheProjection<Integer, ?> cache, int cnt)
+ protected List<Integer> primaryKeys(GridCacheProjection<?, ?> cache, int cnt)
throws IgniteCheckedException {
return primaryKeys(cache, cnt, 1);
}
@@ -301,8 +300,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is primary.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> primaryKeys(GridCacheProjection<Integer, ?> cache, int cnt, int startFrom)
+ protected List<Integer> primaryKeys(GridCacheProjection<?, ?> cache, int cnt, int startFrom)
throws IgniteCheckedException {
+ assert cnt > 0 : cnt;
+
List<Integer> found = new ArrayList<>(cnt);
ClusterNode locNode = cache.gridProjection().ignite().cluster().localNode();
@@ -328,7 +329,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is backup.
* @throws IgniteCheckedException If failed.
*/
- protected Integer backupKey(GridCacheProjection<Integer, ?> cache)
+ protected Integer backupKey(GridCacheProjection<?, ?> cache)
throws IgniteCheckedException {
return backupKeys(cache, 1, 1).get(0);
}
@@ -339,7 +340,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is backup.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> backupKeys(GridCacheProjection<Integer, ?> cache, int cnt)
+ protected List<Integer> backupKeys(GridCacheProjection<?, ?> cache, int cnt)
throws IgniteCheckedException {
return backupKeys(cache, cnt, 1);
}
@@ -351,8 +352,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is backup.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> backupKeys(GridCacheProjection<Integer, ?> cache, int cnt, int startFrom)
+ protected List<Integer> backupKeys(GridCacheProjection<?, ?> cache, int cnt, int startFrom)
throws IgniteCheckedException {
+ assert cnt > 0 : cnt;
+
List<Integer> found = new ArrayList<>(cnt);
ClusterNode locNode = cache.gridProjection().ignite().cluster().localNode();
@@ -378,7 +381,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
- protected Integer nearKey(GridCacheProjection<Integer, ?> cache)
+ protected Integer nearKey(GridCacheProjection<?, ?> cache)
throws IgniteCheckedException {
return nearKeys(cache, 1, 1).get(0);
}
@@ -389,7 +392,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> nearKeys(GridCacheProjection<Integer, ?> cache, int cnt)
+ protected List<Integer> nearKeys(GridCacheProjection<?, ?> cache, int cnt)
throws IgniteCheckedException {
return nearKeys(cache, cnt, 1);
}
@@ -401,8 +404,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @return Collection of keys for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
- protected List<Integer> nearKeys(GridCacheProjection<Integer, ?> cache, int cnt, int startFrom)
+ protected List<Integer> nearKeys(GridCacheProjection<?, ?> cache, int cnt, int startFrom)
throws IgniteCheckedException {
+ assert cnt > 0 : cnt;
+
List<Integer> found = new ArrayList<>(cnt);
ClusterNode locNode = cache.gridProjection().ignite().cluster().localNode();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d84da7d0/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index 3059c4e..cab96bd 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -37,7 +37,7 @@ public class GridDataGridTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Gridgain In-Memory Data Grid Test Suite");
- suite.addTestSuite(IgniteCacheTest.class);
+ suite.addTestSuite(IgniteCacheAbstractTest.class);
// Affinity tests.
suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class);