You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:22:49 UTC
[13/38] incubator-ignite git commit: # ignite-41
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/85de2471
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/85de2471
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/85de2471
Branch: refs/heads/ignite-1
Commit: 85de247138059e611110703165ddc07bded66cc4
Parents: 5f95bd2
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 18 15:05:39 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 18 17:50:41 2014 +0300
----------------------------------------------------------------------
.../cache/GridCacheAccessExpiryPolicy.java | 34 +++--
.../processors/cache/GridCacheAdapter.java | 2 +-
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheExpiryPolicy.java | 47 ++++++
.../processors/cache/GridCacheMapEntry.java | 74 ++++-----
.../processors/cache/GridCacheTxEntry.java | 2 +-
.../kernal/processors/cache/GridCacheUtils.java | 2 +-
.../distributed/GridCacheExpiryPolicy.java | 152 ------------------
.../GridCacheExternalizableExpiryPolicy.java | 153 +++++++++++++++++++
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 128 +++++++++++++---
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +-
.../distributed/near/GridNearAtomicCache.java | 15 +-
.../distributed/near/GridNearCacheAdapter.java | 23 ++-
.../distributed/near/GridNearCacheEntry.java | 11 +-
.../distributed/near/GridNearGetFuture.java | 8 +-
.../near/GridNearTransactionalCache.java | 14 +-
.../local/atomic/GridLocalAtomicCache.java | 18 ++-
.../GridTcpCommunicationMessageFactory.java | 2 +-
.../IgniteCacheExpiryPolicyAbstractTest.java | 119 +++++++++++----
.../cache/GridCacheAbstractTtlSelfTest.java | 95 ------------
.../processors/cache/GridCacheTestEntryEx.java | 6 +-
.../near/GridCachePartitionedTtlSelfTest.java | 25 ---
.../GridCacheReplicatedTtlSelfTest.java | 24 ---
.../cache/local/GridCacheLocalTtlSelfTest.java | 25 ---
.../bamboo/GridDataGridTestSuite.java | 2 -
.../cache/GridCacheAbstractQuerySelfTest.java | 14 +-
29 files changed, 538 insertions(+), 483 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
index 07f4ae8..f0904dd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -20,9 +20,9 @@ import java.util.*;
/**
*
*/
-public class GridCacheAccessExpiryPolicy {
+public class GridCacheAccessExpiryPolicy implements GridCacheExpiryPolicy {
/** */
- private final long ttl;
+ private final long accessTtl;
/** */
private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
@@ -44,19 +44,27 @@ public class GridCacheAccessExpiryPolicy {
}
/**
- * @param ttl TTL for access.
+ * @param accessTtl TTL for access.
*/
- public GridCacheAccessExpiryPolicy(long ttl) {
- assert ttl >= 0 : ttl;
+ public GridCacheAccessExpiryPolicy(long accessTtl) {
+ assert accessTtl >= 0 : accessTtl;
- this.ttl = ttl;
+ this.accessTtl = accessTtl;
}
- /**
- * @return TTL.
- */
- public long ttl() {
- return ttl;
+ /** {@inheritDoc} */
+ @Override public long forAccess() {
+ return accessTtl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long forCreate() {
+ return -1L;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long forUpdate() {
+ return -1L;
}
/**
@@ -75,7 +83,7 @@ public class GridCacheAccessExpiryPolicy {
* @param ver Entry version.
*/
@SuppressWarnings("unchecked")
- public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+ @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
if (entries0 == null) {
@@ -93,7 +101,7 @@ public class GridCacheAccessExpiryPolicy {
/**
* @return TTL update request.
*/
- @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+ @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
return entries;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index a4ec90f..46029e7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1765,7 +1765,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
final String taskName,
final boolean deserializePortable,
final boolean forcePrimary,
- @Nullable GridCacheAccessExpiryPolicy expiry,
+ @Nullable GridCacheExpiryPolicy expiry,
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter
) {
ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index c6e3ea6..b684183 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -292,7 +292,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
Object transformClo,
String taskName,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc)
+ @Nullable GridCacheExpiryPolicy expiryPlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
/**
@@ -425,7 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
@Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
- @Nullable ExpiryPolicy expiryPlc,
+ @Nullable GridCacheExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
boolean primary,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
new file mode 100644
index 0000000..3ce9572
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheExpiryPolicy.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public interface GridCacheExpiryPolicy {
+ /**
+ * @return TTL.
+ */
+ public abstract long forCreate();
+
+ /**
+ * @return TTL.
+ */
+ public abstract long forUpdate();
+
+ /**
+ * @return TTL.
+ */
+ public abstract long forAccess();
+
+ /**
+ * @param key Entry key.
+ * @param keyBytes Entry key bytes.
+ * @param ver Entry version.
+ */
+ public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver);
+
+ /**
+ * @return TTL update request.
+ */
+ @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index c191dda..19d28b6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -700,7 +700,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
Object transformClo,
String taskName,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expirePlc)
+ @Nullable GridCacheExpiryPolicy expirePlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
cctx.denyOnFlag(LOCAL);
@@ -733,7 +733,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
Object transformClo,
String taskName,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc)
+ @Nullable GridCacheExpiryPolicy expiryPlc)
throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
// Disable read-through if there is no store.
if (readThrough && !cctx.isStoreEnabled())
@@ -882,13 +882,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
if (ret != null && expiryPlc != null) {
- long ttl = expiryPlc.ttl();
+ long ttl = expiryPlc.forAccess();
assert ttl >= 0 : ttl;
updateTtl(ttl);
- expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version());
+ expiryPlc.onAccessUpdated(key(), getOrMarshalKeyBytes(), version());
}
}
@@ -1464,6 +1464,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
assert cctx.isLocal() && cctx.atomic();
V old;
+
boolean res = true;
IgniteBiTuple<Boolean, ?> interceptorRes = null;
@@ -1501,8 +1502,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (!F.isEmpty(filter)) {
boolean pass = cctx.isAll(wrapFilterLocked(), filter);
- if (!pass)
+ if (!pass) {
+ if (expiryPlc != null && hasValueUnlocked()) {
+ long ttl = toTtl(expiryPlc.getExpiryForAccess());
+
+ if (ttl != -1L)
+ updateTtl(ttl);
+ }
+
return new IgniteBiTuple<>(false, retval ? old : null);
+ }
}
// Apply metrics.
@@ -1658,20 +1667,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
/**
- * @param expiryPlc Expiry policy.
- * @param isNew {@code True} if entry is new.
- * @return TTL.
- */
- private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) {
- if (expiryPlc == null)
- return -1L;
-
- Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate();
-
- return toTtl(duration);
- }
-
- /**
* @param duration Duration.
* @return TTL.
*/
@@ -1703,7 +1698,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
@Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
- @Nullable ExpiryPolicy expiryPlc,
+ @Nullable GridCacheExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
boolean primary,
@@ -1750,7 +1745,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
op,
writeObj,
valBytes,
- ttlFromPolicy(expiryPlc, isNew()),
+ expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L,
drTtl,
drExpireTime,
drVer);
@@ -1844,18 +1839,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (!pass) {
if (hasValueUnlocked() && expiryPlc != null) {
- Duration duration = expiryPlc.getExpiryForAccess();
+ newTtl = expiryPlc.forAccess();
- newTtl = toTtl(duration);
-
- if (newTtl != -1L)
+ if (newTtl != -1L) {
updateTtl(newTtl);
+
+ expiryPlc.onAccessUpdated(key, getOrMarshalKeyBytes(), version());
+ }
}
return new GridCacheUpdateAtomicResult<>(false,
retval ? old : null,
null,
- newTtl,
+ 0L,
-1L,
null,
null,
@@ -1929,25 +1925,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
else {
assert drExpireTime == -1L;
- if (expiryPlc != null) {
- if (!hadVal) {
- Duration duration = expiryPlc.getExpiryForCreation();
-
- if (duration != null && duration.isZero())
- return new GridCacheUpdateAtomicResult<>(false,
- retval ? old : null,
- null,
- 0L,
- -1L,
- null,
- null,
- false);
-
- newTtl = toTtl(duration);
- }
- else
- newTtl = toTtl(expiryPlc.getExpiryForUpdate());
- }
+ if (expiryPlc != null)
+ newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
else
newTtl = -1L;
@@ -3428,6 +3407,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
/** {@inheritDoc} */
@Override public void updateTtl(GridCacheVersion ver, long ttl) {
synchronized (this) {
+ updateTtl(ttl);
+
+ /*
+ TODO IGNITE-41.
try {
if (ver.equals(version()))
updateTtl(ttl);
@@ -3435,6 +3418,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
catch (GridCacheEntryRemovedException ignored) {
// No-op.
}
+ */
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
index df888e7..0bff22b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
@@ -836,7 +836,7 @@ public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizab
out.writeBoolean(grpLock);
CU.writeVersion(out, drVer);
- out.writeObject(transferExpiryPlc ? new GridCacheExpiryPolicy(expiryPlc) : null);
+ out.writeObject(transferExpiryPlc ? new GridCacheExternalizableExpiryPolicy(expiryPlc) : null);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index d28f728..77f6a7a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -1586,7 +1586,7 @@ public class GridCacheUtils {
* @return Expire time.
*/
public static long toExpireTime(long ttl) {
- assert ttl >= 0L;
+ assert ttl >= 0L : ttl;
if (ttl == 0L)
return 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
deleted file mode 100644
index 3a77884..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.expiry.*;
-import java.io.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
- /** */
- private ExpiryPolicy plc;
-
- /** */
- private static final byte CREATE_TTL_MASK = 0x01;
-
- /** */
- private static final byte UPDATE_TTL_MASK = 0x02;
-
- /** */
- private static final byte ACCESS_TTL_MASK = 0x04;
-
- /** */
- private Duration forCreate;
-
- /** */
- private Duration forUpdate;
-
- /** */
- private Duration forAccess;
-
- /**
- * Required by {@link Externalizable}.
- */
- public GridCacheExpiryPolicy() {
- // No-op.
- }
-
- /**
- * @param plc Expiry policy.
- */
- public GridCacheExpiryPolicy(ExpiryPolicy plc) {
- assert plc != null;
-
- this.plc = plc;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForCreation() {
- return forCreate;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForAccess() {
- return forAccess;
- }
-
- /** {@inheritDoc} */
- @Override public Duration getExpiryForUpdate() {
- return forUpdate;
- }
-
- /**
- * @param out Output stream.
- * @param duration Duration.
- * @throws IOException
- */
- private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
- if (duration != null) {
- if (duration.isEternal())
- out.writeLong(0);
- else if (duration.getDurationAmount() == 0)
- out.writeLong(1);
- else
- out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
- }
- }
-
- /**
- * @param in Input stream.
- * @return Duration.
- * @throws IOException
- */
- private Duration readDuration(ObjectInput in) throws IOException {
- long ttl = in.readLong();
-
- assert ttl >= 0;
-
- if (ttl == 0)
- return Duration.ETERNAL;
-
- return new Duration(TimeUnit.MILLISECONDS, ttl);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- byte flags = 0;
-
- Duration create = plc.getExpiryForCreation();
-
- if (create != null)
- flags |= CREATE_TTL_MASK;
-
- Duration update = plc.getExpiryForUpdate();
-
- if (update != null)
- flags |= UPDATE_TTL_MASK;
-
- Duration access = plc.getExpiryForAccess();
-
- if (access != null)
- flags |= ACCESS_TTL_MASK;
-
- out.writeByte(flags);
-
- writeDuration(out, create);
-
- writeDuration(out, update);
-
- writeDuration(out, access);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- byte flags = in.readByte();
-
- if ((flags & CREATE_TTL_MASK) != 0)
- forCreate = readDuration(in);
-
- if ((flags & UPDATE_TTL_MASK) != 0)
- forUpdate = readDuration(in);
-
- if ((flags & ACCESS_TTL_MASK) != 0)
- forAccess = readDuration(in);
- }
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheExpiryPolicy.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
new file mode 100644
index 0000000..75da5de
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExternalizableExpiryPolicy.java
@@ -0,0 +1,153 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.expiry.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class GridCacheExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable {
+ /** */
+ private ExpiryPolicy plc;
+
+ /** */
+ private static final byte CREATE_TTL_MASK = 0x01;
+
+ /** */
+ private static final byte UPDATE_TTL_MASK = 0x02;
+
+ /** */
+ private static final byte ACCESS_TTL_MASK = 0x04;
+
+ /** */
+ private Duration forCreate;
+
+ /** */
+ private Duration forUpdate;
+
+ /** */
+ private Duration forAccess;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public GridCacheExternalizableExpiryPolicy() {
+ // No-op.
+ }
+
+ /**
+ * @param plc Expiry policy.
+ */
+ public GridCacheExternalizableExpiryPolicy(ExpiryPolicy plc) {
+ assert plc != null;
+
+ this.plc = plc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ return forCreate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ return forAccess;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return forUpdate;
+ }
+
+ /**
+ * @param out Output stream.
+ * @param duration Duration.
+ * @throws IOException If failed.
+ */
+ private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
+ if (duration != null) {
+ if (duration.isEternal())
+ out.writeLong(0);
+ else if (duration.getDurationAmount() == 0)
+ out.writeLong(1);
+ else
+ out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
+ }
+ }
+
+ /**
+ * @param in Input stream.
+ * @return Duration.
+ * @throws IOException If failed.
+ */
+ private Duration readDuration(ObjectInput in) throws IOException {
+ long ttl = in.readLong();
+
+ assert ttl >= 0;
+
+ if (ttl == 0)
+ return Duration.ETERNAL;
+
+ return new Duration(TimeUnit.MILLISECONDS, ttl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ byte flags = 0;
+
+ Duration create = plc.getExpiryForCreation();
+
+ if (create != null)
+ flags |= CREATE_TTL_MASK;
+
+ Duration update = plc.getExpiryForUpdate();
+
+ if (update != null)
+ flags |= UPDATE_TTL_MASK;
+
+ Duration access = plc.getExpiryForAccess();
+
+ if (access != null)
+ flags |= ACCESS_TTL_MASK;
+
+ out.writeByte(flags);
+
+ writeDuration(out, create);
+
+ writeDuration(out, update);
+
+ writeDuration(out, access);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ byte flags = in.readByte();
+
+ if ((flags & CREATE_TTL_MASK) != 0)
+ forCreate = readDuration(in);
+
+ if ((flags & UPDATE_TTL_MASK) != 0)
+ forUpdate = readDuration(in);
+
+ if ((flags & ACCESS_TTL_MASK) != 0)
+ forAccess = readDuration(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheExternalizableExpiryPolicy.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index fef3b82..a7b1fea 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -428,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* This method is used internally. Use
- * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], GridCacheAccessExpiryPolicy)}
+ * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], org.gridgain.grid.kernal.processors.cache.GridCacheExpiryPolicy)}
* method instead to retrieve DHT value.
*
* @param keys {@inheritDoc}
@@ -483,7 +483,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
String taskName,
boolean deserializePortable,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expiry
+ @Nullable GridCacheExpiryPolicy expiry
) {
return getAllAsync(keys,
null,
@@ -518,7 +518,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
int taskNameHash,
boolean deserializePortable,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expiry) {
+ @Nullable GridCacheExpiryPolicy expiry) {
GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
msgId,
reader,
@@ -599,7 +599,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/**
* @param expiryPlc Expiry policy.
*/
- protected void sendTtlUpdateRequest(@Nullable final GridCacheAccessExpiryPolicy expiryPlc) {
+ public void sendTtlUpdateRequest(@Nullable final GridCacheExpiryPolicy expiryPlc) {
if (expiryPlc != null && expiryPlc.entries() != null) {
ctx.closures().runLocalSafe(new Runnable() {
@SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
@@ -622,7 +622,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
if (req == null) {
- reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.ttl()));
+ reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.forAccess()));
req.cacheId(ctx.cacheId());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 2ce4b6c..7a0db43 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -84,7 +84,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
private boolean deserializePortable;
/** Expiry policy. */
- private GridCacheAccessExpiryPolicy expiryPlc;
+ private GridCacheExpiryPolicy expiryPlc;
/**
* Empty constructor required for {@link Externalizable}.
@@ -119,7 +119,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
@Nullable UUID subjId,
int taskNameHash,
boolean deserializePortable,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
+ @Nullable GridCacheExpiryPolicy expiryPlc) {
super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer());
assert reader != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4a3491d..942d0c5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -89,7 +89,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
private boolean deserializePortable;
/** Expiry policy. */
- private GridCacheAccessExpiryPolicy expiryPlc;
+ private GridCacheExpiryPolicy expiryPlc;
/**
* Empty constructor required for {@link Externalizable}.
@@ -121,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc
+ @Nullable GridCacheExpiryPolicy expiryPlc
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -363,7 +363,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
filters,
subjId,
taskName == null ? 0 : taskName.hashCode(),
- expiryPlc != null ? expiryPlc.ttl() : -1L);
+ expiryPlc != null ? expiryPlc.forAccess() : -1L);
add(fut); // Append new future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3b07fc0..c3ef323 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -16,7 +16,6 @@ import org.apache.ignite.plugin.security.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -893,6 +892,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ GridCacheExpiryPolicy expiry = null;
+
try {
// If batch store update is enabled, we need to lock all entries.
// First, need to acquire locks on cache entries, then check filter.
@@ -941,17 +942,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate = ctx.isDrEnabled();
+ expiry = expiryPolicy(req.expiry() != null ? req.expiry() : ctx.expiry());
+
if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) {
// This method can only be used when there are no replicated entries in the batch.
- UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver,
- dhtFut, completionCb, replicate, taskName);
+ UpdateBatchResult<K, V> updRes = updateWithBatch(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ replicate,
+ taskName,
+ expiry);
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
}
else {
- UpdateSingleResult<K, V> updRes = updateSingle(node, hasNear, req, res, locked, ver,
- dhtFut, completionCb, replicate, taskName);
+ UpdateSingleResult<K, V> updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ replicate,
+ taskName,
+ expiry);
retVal = updRes.returnValue();
deleted = updRes.deleted();
@@ -1013,6 +1034,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else
completionCb.apply(req, res);
}
+
+ sendTtlUpdateRequest(expiry);
}
/**
@@ -1027,6 +1050,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param dhtFut Optional DHT future.
* @param completionCb Completion callback to invoke when DHT future is completed.
* @param replicate Whether replication is enabled.
+ * @param taskName Task name.
+ * @param expiry Expiry policy.
* @return Deleted entries.
* @throws GridCacheEntryRemovedException Should not be thrown.
*/
@@ -1041,7 +1066,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
boolean replicate,
- String taskName
+ String taskName,
+ @Nullable GridCacheExpiryPolicy expiry
) throws GridCacheEntryRemovedException {
// Cannot update in batches during DR due to possible conflicts.
assert !req.returnValue(); // Should not request return values for putAll.
@@ -1078,7 +1104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (!checkFilter(entry, req, res)) {
- // TODO IGNITE-41 update TTL.
+ if (expiry != null && entry.hasValue()) {
+ long ttl = expiry.forAccess();
+
+ if (ttl != -1L) {
+ entry.updateTtl(null, ttl);
+
+ expiry.onAccessUpdated(entry.key(), entry.getOrMarshalKeyBytes(), entry.version());
+ }
+ }
if (log.isDebugEnabled())
log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
@@ -1143,7 +1177,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res,
replicate,
updRes,
- taskName);
+ taskName,
+ expiry);
firstEntryIdx = i + 1;
@@ -1184,7 +1219,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res,
replicate,
updRes,
- taskName);
+ taskName,
+ expiry);
firstEntryIdx = i + 1;
@@ -1293,7 +1329,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res,
replicate,
updRes,
- taskName);
+ taskName,
+ expiry);
}
else
assert filtered.isEmpty();
@@ -1366,6 +1403,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param completionCb Completion callback to invoke when DHT future is completed.
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
+ * @param expiry Expiry policy.
* @return Return value.
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
@@ -1379,7 +1417,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
boolean replicate,
- String taskName
+ String taskName,
+ @Nullable GridCacheExpiryPolicy expiry
) throws GridCacheEntryRemovedException {
GridCacheReturn<Object> retVal = null;
Collection<IgniteBiTuple<GridDhtCacheEntry<K, V>, GridCacheVersion>> deleted = null;
@@ -1394,8 +1433,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
- ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
K k = keys.get(i);
@@ -1505,8 +1542,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expireTime);
}
else {
- // TODO IGNITE-41 ttl could be changed.
-
if (log.isDebugEnabled())
log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
@@ -1610,7 +1645,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicUpdateResponse<K, V> res,
boolean replicate,
UpdateBatchResult<K, V> batchRes,
- String taskName
+ String taskName,
+ @Nullable GridCacheExpiryPolicy expiry
) {
assert putMap == null ^ rmvKeys == null;
@@ -1658,8 +1694,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
- ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry<K, V> entry = entries.get(i);
@@ -1703,7 +1737,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
primary,
ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
- req.filter(), // TODO IGNITE-41 filter already evaluated?
+ null,
replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
-1L,
-1L,
@@ -2375,6 +2409,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
+ /**
+ * @param plc Expiry policy.
+ * @return Expiry policy wrapper.
+ */
+ static GridCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) {
+ return plc == null ? null : new UpdateExpiryPolicy(plc);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicCache.class, this, super.toString());
@@ -2652,4 +2694,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
pendingResponses.remove(nodeId, this);
}
}
+
+ /**
+ *
+ */
+ private static class UpdateExpiryPolicy implements GridCacheExpiryPolicy {
+ /** */
+ private final ExpiryPolicy plc;
+
+ /** */
+ private Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
+
+ /**
+ * @param plc Expiry policy.
+ */
+ private UpdateExpiryPolicy(ExpiryPolicy plc) {
+ assert plc != null;
+
+ this.plc = plc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long forCreate() {
+ return GridCacheMapEntry.toTtl(plc.getExpiryForCreation());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long forUpdate() {
+ return GridCacheMapEntry.toTtl(plc.getExpiryForUpdate());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long forAccess() {
+ return GridCacheMapEntry.toTtl(plc.getExpiryForAccess());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAccessUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+ if (entries == null)
+ entries = new HashMap<>();
+
+ entries.put(key, new IgniteBiTuple<>(keyBytes, ver));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+ return entries;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 3eca7e2..1265edb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -492,7 +492,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
filterBytes = marshalFilter(filter, ctx);
if (expiryPlc != null)
- expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc));
+ expiryPlcBytes = CU.marshal(ctx, new GridCacheExternalizableExpiryPolicy(expiryPlc));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 1da6626..5c89c3f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -23,7 +23,6 @@ import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
-import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
@@ -363,9 +362,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
- subjId = ctx.subjectIdPerCall(subjId);
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
- return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable);
+ subjId = ctx.subjectIdPerCall(subjId, prj);
+
+ return loadAsync(null,
+ keys,
+ false,
+ forcePrimary,
+ filter,
+ subjId,
+ taskName,
+ deserializePortable,
+ prj != null ? prj.expiry() : null);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 6c2fa8c..6f71d3c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -21,6 +21,7 @@ import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
@@ -164,7 +165,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Override public IgniteFuture<Object> readThroughAllAsync(Collection<? extends K> keys, boolean reload,
GridCacheTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId, String taskName,
IgniteBiInClosure<K, V> vis) {
- return (IgniteFuture)loadAsync(tx, keys, reload, false, filter, subjId, taskName, true);
+ return (IgniteFuture)loadAsync(tx,
+ keys,
+ reload,
+ false,
+ filter,
+ subjId,
+ taskName,
+ true,
+ null);
}
/** {@inheritDoc} */
@@ -246,6 +255,10 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param reload Reload flag.
* @param forcePrimary Force primary flag.
* @param filter Filter.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializePortable Deserialize portable flag.
+ * @param expiryPlc Expiry policy.
* @return Loaded values.
*/
public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx,
@@ -255,7 +268,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@Nullable UUID subjId,
String taskName,
- boolean deserializePortable) {
+ boolean deserializePortable,
+ @Nullable ExpiryPolicy expiryPlc) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -264,7 +278,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null;
- // TODO IGNITE-41.
+ final GridCacheAccessExpiryPolicy expiry =
+ GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
@@ -275,7 +290,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
subjId,
taskName,
deserializePortable,
- null);
+ expiry);
// init() will register future for responses if future has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
index 7759f49..967496b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -312,8 +312,15 @@ public class GridNearCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
/** {@inheritDoc} */
@Override protected V readThrough(GridCacheTxEx<K, V> tx, K key, boolean reload,
IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName) throws IgniteCheckedException {
- return cctx.near().loadAsync(tx, F.asList(key), reload, /*force primary*/false, filter, subjId, taskName, true).
- get().get(key);
+ return cctx.near().loadAsync(tx,
+ F.asList(key),
+ reload,
+ /*force primary*/false,
+ filter,
+ subjId,
+ taskName,
+ true,
+ null).get().get(key);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index b5d5e29..ea8450a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -91,7 +91,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
private boolean deserializePortable;
/** Expiry policy. */
- private GridCacheAccessExpiryPolicy expiryPlc;
+ private GridCacheExpiryPolicy expiryPlc;
/**
* Empty constructor required for {@link Externalizable}.
@@ -123,7 +123,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc
+ @Nullable GridCacheExpiryPolicy expiryPlc
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -238,6 +238,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (trackable)
cctx.mvcc().removeFuture(this);
+ cache().dht().sendTtlUpdateRequest(expiryPlc);
+
return true;
}
@@ -366,7 +368,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
filters,
subjId,
taskName == null ? 0 : taskName.hashCode(),
- expiryPlc != null ? expiryPlc.ttl() : -1L);
+ expiryPlc != null ? expiryPlc.forAccess() : -1L);
add(fut); // Append new future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c8d90e2..416d193 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -108,9 +108,19 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
});
}
- subjId = ctx.subjectIdPerCall(subjId);
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
- return loadAsync(null, keys, false, forcePrimary, filter, subjId, taskName, deserializePortable);
+ subjId = ctx.subjectIdPerCall(subjId, prj);
+
+ return loadAsync(null,
+ keys,
+ false,
+ forcePrimary,
+ filter,
+ subjId,
+ taskName,
+ deserializePortable,
+ prj != null ? prj.expiry() : null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index d901311..6f2b4e4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -257,7 +257,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (GridCacheReturn<V>)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- null,
+ expiryPerCall(),
true,
true,
ctx.equalsPeekArray(val),
@@ -384,7 +384,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- null,
+ expiryPerCall(),
true,
false,
filter,
@@ -410,7 +410,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
updateAllInternal(DELETE,
keys,
null,
- null,
+ expiryPerCall(),
false,
false,
filter,
@@ -437,7 +437,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- null,
+ expiryPerCall(),
false,
false,
filter,
@@ -465,7 +465,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
- null,
+ expiryPerCall(),
false,
false,
ctx.equalsPeekArray(val),
@@ -686,10 +686,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
) {
final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE;
+
final Collection<? extends K> keys =
map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
+
final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
+
final boolean storeEnabled = ctx.isStoreEnabled();
+
final ExpiryPolicy expiry = expiryPerCall();
return asyncOp(new Callable<Object>() {
@@ -723,12 +727,14 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
) {
final boolean storeEnabled = ctx.isStoreEnabled();
+ final ExpiryPolicy expiryPlc = expiryPerCall();
+
return asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(DELETE,
keys,
null,
- null,
+ expiryPlc,
retval,
rawRetval,
filter,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index b2ae55b..b238600 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -45,7 +45,7 @@ public class GridTcpCommunicationMessageFactory {
private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>();
/** */
- public static final int MAX_COMMON_TYPE = 81;
+ public static final int MAX_COMMON_TYPE = 100;
static {
registerCommon(new GridTcpCommunicationMessageProducer() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 6c17203..326b232 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -142,6 +142,12 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
accessGetAll();
+
+ for (final Integer key : keys()) {
+ log.info("Test filter access [key=" + key + ']');
+
+ filterAccessRemove(key);
+ }
}
/**
@@ -155,7 +161,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
checkTtl(key, 60_000L);
- assertEquals((Integer)1, cache.get(key));
+ assertEquals((Integer) 1, cache.get(key));
checkTtl(key, 62_000L, true);
@@ -167,6 +173,22 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void filterAccessRemove(Integer key) throws Exception {
+ IgniteCache<Integer, Integer> cache = jcache();
+
+ cache.put(key, 1);
+
+ checkTtl(key, 60_000L);
+
+ assertFalse(cache.remove(key, 2));
+
+ checkTtl(key, 62_000L, true);
+ }
+
+ /**
* @throws Exception If failed.
*/
private void accessGetAll() throws Exception {
@@ -179,10 +201,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
cache.removeAll(vals.keySet());
- for (Map.Entry<Integer, Integer> e : vals.entrySet())
- cache.put(e.getKey(), e.getValue());
-
- //cache.putAll(vals);
+ cache.putAll(vals);
for (Integer key : vals.keySet())
checkTtl(key, 60_000L);
@@ -456,41 +475,52 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
if (cacheMode() != PARTITIONED)
return;
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
nearCache = true;
- startGrids();
+ testCreateUpdate();
+
+ nearReaderUpdate();
- Integer key = nearKey(cache(0));
+ nearPutAll();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void nearReaderUpdate() throws Exception {
+ log.info("Test near reader update.");
+
+ Integer key = nearKeys(cache(0), 1, 500_000).get(0);
- IgniteCache<Integer, Integer> jcache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
- jcache0.put(key, 1);
+ assertEquals(NEAR_PARTITIONED, cache(0).configuration().getDistributionMode());
+
+ cache0.put(key, 1);
checkTtl(key, 60_000L);
- IgniteCache<Integer, Integer> jcache1 = jcache(1);
+ IgniteCache<Integer, Integer> cache1 = jcache(1);
// Update from another node.
- jcache1.put(key, 2);
+ cache1.put(key, 2);
checkTtl(key, 61_000L);
// Update from another node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
+ cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 3);
checkTtl(key, 1000L);
waitExpired(key);
// Try create again.
- jcache0.put(key, 1);
+ cache0.put(key, 1);
checkTtl(key, 60_000L);
// Update from near node with provided TTL.
- jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+ cache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
checkTtl(key, 1100L);
@@ -500,38 +530,31 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
/**
* @throws Exception If failed.
*/
- public void testNearPutAll() throws Exception {
- if (cacheMode() != PARTITIONED)
- return;
-
- factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
- nearCache = true;
-
- startGrids();
-
+ private void nearPutAll() throws Exception {
Map<Integer, Integer> vals = new HashMap<>();
for (int i = 0; i < 1000; i++)
vals.put(i, i);
- IgniteCache<Integer, Integer> jcache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
- jcache0.putAll(vals);
+ cache0.removeAll(vals.keySet());
+
+ cache0.putAll(vals);
for (Integer key : vals.keySet())
checkTtl(key, 60_000L);
- IgniteCache<Integer, Integer> jcache1 = jcache(1);
+ IgniteCache<Integer, Integer> cache1 = jcache(1);
// Update from another node.
- jcache1.putAll(vals);
+ cache1.putAll(vals);
for (Integer key : vals.keySet())
checkTtl(key, 61_000L);
// Update from another node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+ cache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
for (Integer key : vals.keySet())
checkTtl(key, 1000L);
@@ -539,10 +562,10 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
waitExpired(vals.keySet());
// Try create again.
- jcache0.putAll(vals);
+ cache0.putAll(vals);
// Update from near node with provided TTL.
- jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
+ cache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
for (Integer key : vals.keySet())
checkTtl(key, 1101L);
@@ -551,6 +574,36 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}
/**
+ * @throws Exception If failed.
+ */
+ public void testNearAccess() throws Exception {
+ if (cacheMode() != PARTITIONED)
+ return;
+
+ nearCache = true;
+
+ testAccess();
+
+ Integer key = primaryKeys(cache(0), 1, 500_000).get(0);
+
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+
+ cache0.put(key, 1);
+
+ checkTtl(key, 60_000L);
+
+ assertEquals(1, jcache(1).get(key));
+
+ checkTtl(key, 62_000L, true);
+
+ assertEquals(1, jcache(2).withExpiryPolicy(new TestPolicy(1100L, 1200L, 1000L)).get(key));
+
+ checkTtl(key, 1000L, true);
+
+ waitExpired(key);
+ }
+
+ /**
* @return Test keys.
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
deleted file mode 100644
index a6b3f8f..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractTtlSelfTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
-* __ ____/___________(_)______ /__ ____/______ ____(_)_______
-* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
-* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
-* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.testframework.*;
-
-/**
- * Entry time-to-live abstract test.
- */
-public abstract class GridCacheAbstractTtlSelfTest extends GridCacheAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheStore<?, ?> cacheStore() {
- return null;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGetExpired() throws Exception {
- final GridCache<String, Integer> c = cache();
-
- final String key = "1";
-
- int ttl = 500;
-
- GridCacheEntry<String, Integer> entry = c.entry(key);
-
- entry.timeToLive(ttl);
-
- entry.setValue(1);
-
- checkKeyIsRetired(key, ttl);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testGetExpiredTx() throws Exception {
- GridCache<String, Integer> c = cache();
-
- String key = "1";
- int ttl = 500;
-
- try (GridCacheTx tx = c.txStart()) {
- GridCacheEntry<String, Integer> entry = c.entry(key);
-
- entry.timeToLive(ttl);
-
- entry.setValue(1);
-
- tx.commit();
- }
-
- checkKeyIsRetired(key, ttl);
- }
-
- /**
- * Checks if the given cache has entry with the given key with some timeout based on the given TTL.
- *
- * @param key Key to be checked.
- * @param ttl Base value for timeout before checking starts.
- * @throws Exception If failed
- */
- private void checkKeyIsRetired(final String key, int ttl) throws Exception {
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
- @Override public boolean applyx() throws IgniteCheckedException {
- for (int i = 0; i < gridCount(); i++) {
- if (cache(i).get(key) != null) {
- info("Key is still in cache of grid " + i);
-
- return false;
- }
- }
-
- return true;
- }
- }, ttl * 4));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 6a21fe7..a7b970d 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -403,7 +403,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
Object transformClo,
String taskName,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
- @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
+ @Nullable GridCacheExpiryPolicy expiryPlc) {
return val;
}
@@ -427,7 +427,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
@Nullable Object writeObj,
boolean writeThrough,
boolean retval,
- ExpiryPolicy expiryPlc,
+ @Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -448,7 +448,7 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
@Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
- ExpiryPolicy expiryPlc,
+ @Nullable GridCacheExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
boolean primary,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
deleted file mode 100644
index d74c04e..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCachePartitionedTtlSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
-* __ ____/___________(_)______ /__ ____/______ ____(_)_______
-* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
-* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
-* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.distributed.near;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for partitioned cache.
- */
-public class GridCachePartitionedTtlSelfTest extends GridCacheAbstractTtlSelfTest {
- /** {@inheritDoc} */
- @Override protected GridCacheMode cacheMode() {
- return PARTITIONED;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
deleted file mode 100644
index 1c71c4a..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedTtlSelfTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
-* __ ____/___________(_)______ /__ ____/______ ____(_)_______
-* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
-* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
-* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.distributed.replicated;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for replicated cache.
- */
-public class GridCacheReplicatedTtlSelfTest extends GridCacheAbstractTtlSelfTest {
- /** {@inheritDoc} */
- @Override protected GridCacheMode cacheMode() {
- return REPLICATED;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
deleted file mode 100644
index 9c599c1..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/local/GridCacheLocalTtlSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
-* __ ____/___________(_)______ /__ ____/______ ____(_)_______
-* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
-* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
-* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
-*/
-
-package org.gridgain.grid.kernal.processors.cache.local;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- * Entry time-to-live test for local cache.
- */
-public class GridCacheLocalTtlSelfTest extends GridCacheAbstractTtlSelfTest {
- /** {@inheritDoc} */
- @Override protected GridCacheMode cacheMode() {
- return LOCAL;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index e958f77..94dd70b 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -149,7 +149,6 @@ public class GridDataGridTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedTxSingleThreadedSelfTest.class));
suite.addTest(new TestSuite(GridCacheColocatedTxSingleThreadedSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedTxTimeoutSelfTest.class));
- suite.addTest(new TestSuite(GridCachePartitionedTtlSelfTest.class));
suite.addTest(new TestSuite(GridCacheFinishPartitionsSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtEntrySelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtInternalEntrySelfTest.class));
@@ -242,7 +241,6 @@ public class GridDataGridTestSuite extends TestSuite {
suite.addTestSuite(GridCacheReplicatedEvictionEventSelfTest.class);
// TODO: GG-7569.
// suite.addTestSuite(GridCacheReplicatedTxMultiThreadedSelfTest.class);
- suite.addTestSuite(GridCacheReplicatedTtlSelfTest.class);
suite.addTestSuite(GridCacheReplicatedPreloadEventsSelfTest.class);
suite.addTestSuite(GridCacheReplicatedPreloadStartStopEventsSelfTest.class);
// TODO: GG-7434
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85de2471/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
index 2883215..94bd4fb 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java
@@ -32,6 +32,7 @@ import org.gridgain.testframework.junits.common.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -286,17 +287,10 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT
* @throws Exception If failed.
*/
public void testExpiration() throws Exception {
- GridCache<String, Integer> cache = ignite.cache(null);
-
- GridCacheEntry<String, Integer> entry = cache.entry("key1");
-
- assert entry != null;
+ ignite.jcache(null).
+ withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1000))).put("key1", 1);
- entry.timeToLive(1000);
-
- entry.set(1);
-
- assert entry.isCached();
+ GridCache<String, Integer> cache = ignite.cache(null);
GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "1=1");