You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:22:39 UTC
[03/38] incubator-ignite git commit: # ignite-41
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2a51321
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2a51321
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2a51321
Branch: refs/heads/ignite-1
Commit: c2a513218cb832390bdc6b228eaddb10af2c27a4
Parents: 7f1b3f0
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 15 10:29:48 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 15 17:45:23 2014 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 94 +++++----
.../processors/cache/GridCacheAdapter.java | 71 +++++--
.../processors/cache/GridCacheContext.java | 20 ++
.../processors/cache/GridCacheEntryEx.java | 5 +-
.../processors/cache/GridCacheMapEntry.java | 138 +++++++++++---
.../processors/cache/GridCacheProcessor.java | 2 +-
.../processors/cache/GridCacheProjectionEx.java | 12 ++
.../cache/GridCacheProjectionImpl.java | 83 ++++++--
.../processors/cache/GridCacheProxyImpl.java | 11 ++
.../distributed/GridCacheExpiryPolicy.java | 88 +++++++++
.../dht/atomic/GridDhtAtomicCache.java | 59 ++++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 27 ++-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 30 +--
.../dht/atomic/GridNearAtomicUpdateFuture.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 37 ++--
.../atomic/GridNearAtomicUpdateResponse.java | 41 +---
.../distributed/near/GridNearAtomicCache.java | 19 +-
.../processors/cache/IgniteCacheTest.java | 2 +
.../expiry/IgniteCacheExpiryPolicyTest.java | 191 +++++++++++++++----
.../processors/cache/GridCacheTestEntryEx.java | 28 ++-
20 files changed, 749 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5971d69..f1d27f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.tostring.*;
@@ -36,25 +35,32 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** */
private static final long serialVersionUID = 0L;
- /** Context. */
+ /** */
private GridCacheContext<K, V> ctx;
/** Gateway. */
private GridCacheGateway<K, V> gate;
- /** Cache. */
+ /** Delegate. */
@GridToStringInclude
- private GridCacheAdapter<K, V> delegate;
+ private GridCacheProjectionEx<K, V> delegate;
+
+ /** Projection. */
+ private GridCacheProjectionImpl<K, V> prj;
/**
* @param delegate Delegate.
+ * @param prj Projection.
*/
- public IgniteCacheProxy(GridCacheAdapter<K, V> delegate) {
+ public IgniteCacheProxy(GridCacheContext<K, V> ctx,
+ GridCacheProjectionEx<K, V> delegate,
+ @Nullable GridCacheProjectionImpl<K, V> prj) {
+ assert ctx != null;
assert delegate != null;
+ this.ctx = ctx;
this.delegate = delegate;
-
- ctx = delegate.context();
+ this.prj = prj;
gate = ctx.gate();
}
@@ -73,8 +79,16 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
+
+ return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0);
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
@@ -92,7 +106,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.putIfAbsent(key, val);
@@ -126,7 +140,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean isLocked(K key) {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.isLocked(key);
@@ -138,7 +152,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean isLockedByThread(K key) {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.isLockedByThread(key);
@@ -162,7 +176,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
delegate.evictAll(keys);
@@ -175,13 +189,23 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
// TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ if (peekModes.length != 0)
+ throw new UnsupportedOperationException();
+
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.peek(key);
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
@Override public void localPromote(Set<? extends K> keys) throws CacheException {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
delegate.promoteAll(keys);
@@ -216,7 +240,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public V get(K key) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.get(key);
@@ -233,7 +257,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.getAll(keys);
@@ -264,10 +288,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void put(K key, V val) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- delegate.putx(key, val, null);
+ delegate.putx(key, val);
}
finally {
gate.leave(prev);
@@ -281,7 +305,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.put(key, val);
@@ -298,10 +322,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- delegate.putAll(map, null);
+ delegate.putAll(map);
}
finally {
gate.leave(prev);
@@ -315,7 +339,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.putxIfAbsent(key, val);
@@ -332,7 +356,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean remove(K key) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.removex(key);
@@ -349,7 +373,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.remove(key, oldVal);
@@ -366,10 +390,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- return delegate.remove(key, (GridCacheEntryEx<K, V>)null);
+ return delegate.remove(key);
}
finally {
gate.leave(prev);
@@ -383,7 +407,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.replace(key, oldVal, newVal);
@@ -400,7 +424,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.replacex(key, val);
@@ -417,7 +441,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.replace(key, val);
@@ -434,7 +458,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) {
try {
- GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
delegate.removeAll(keys);
@@ -581,15 +605,17 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx);
out.writeObject(delegate);
+ out.writeObject(prj);
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- delegate = (GridCacheAdapter<K, V>)in.readObject();
-
- ctx = delegate.context();
+ ctx = (GridCacheContext<K, V>)in.readObject();
+ delegate = (GridCacheProjectionEx<K, V>)in.readObject();
+ prj = (GridCacheProjectionImpl<K, V>)in.readObject();
gate = ctx.gate();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index bf40a85..559949f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -40,6 +40,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -52,8 +53,6 @@ import static org.gridgain.grid.cache.GridCacheFlag.*;
import static org.gridgain.grid.cache.GridCachePeekMode.*;
import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
-import static org.gridgain.grid.cache.GridCacheTxState.*;
-import static org.apache.ignite.events.IgniteEventType.*;
import static org.gridgain.grid.kernal.GridClosureCallMode.*;
import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
import static org.gridgain.grid.kernal.processors.task.GridTaskThreadContextKey.*;
@@ -367,8 +366,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) {
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null,
- null, subjId, false);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ ctx,
+ null,
+ null,
+ null,
+ subjId,
+ false,
+ null);
return new GridCacheProxyImpl<>(ctx, prj, prj);
}
@@ -378,8 +383,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
if (F.isEmpty(flags))
return this;
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null,
- EnumSet.copyOf(F.asList(flags)), null, false);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ ctx,
+ null,
+ null,
+ EnumSet.copyOf(F.asList(flags)),
+ null,
+ false,
+ null);
return new GridCacheProxyImpl<>(ctx, prj, prj);
}
@@ -398,12 +409,31 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
null,
null,
null,
- ctx.portableEnabled());
+ ctx.portableEnabled(),
+ null);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
}
/** {@inheritDoc} */
+ @Nullable @Override public ExpiryPolicy expiry() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ return new GridCacheProjectionImpl<>(
+ this,
+ ctx,
+ null,
+ null,
+ null,
+ null,
+ ctx.portableEnabled(),
+ plc);
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantCast"})
@Override public <K1, V1> GridCacheProjection<K1, V1> projection(
Class<? super K1> keyType,
@@ -423,8 +453,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((GridCacheProjection<K1, V1>)this,
- (GridCacheContext<K1, V1>)ctx, CU.<K1, V1>typeFilter(keyType, valType), /*filter*/null, /*flags*/null,
- /*clientId*/null, false);
+ (GridCacheContext<K1, V1>)ctx,
+ CU.<K1, V1>typeFilter(keyType, valType),
+ /*filter*/null,
+ /*flags*/null,
+ /*clientId*/null,
+ false,
+ null);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
}
@@ -443,7 +478,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
}
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, p, null, null, null, false);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ ctx,
+ p,
+ null,
+ null,
+ null,
+ false,
+ null);
return new GridCacheProxyImpl<>(ctx, prj, prj);
}
@@ -463,7 +505,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(
- this, ctx, null, filter, null, null, false);
+ this,
+ ctx,
+ null,
+ filter,
+ null,
+ null,
+ false,
+ null);
return new GridCacheProxyImpl<>(ctx, prj, prj);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index c6cb355..58fd1cd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -46,6 +46,8 @@ import org.gridgain.grid.util.offheap.unsafe.*;
import org.gridgain.grid.util.tostring.*;
import org.jetbrains.annotations.*;
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -170,6 +172,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Cache ID. */
private int cacheId;
+ /** */
+ private ExpiryPolicy expiryPlc;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -275,6 +280,20 @@ public class GridCacheContext<K, V> implements Externalizable {
}
else
cacheId = 1;
+
+ Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
+
+ expiryPlc = factory.create();
+
+ if (expiryPlc instanceof EternalExpiryPolicy)
+ expiryPlc = null;
+ }
+
+ /**
+ * @return Cache default {@link ExpiryPolicy}.
+ */
+ @Nullable public ExpiryPolicy expiry() {
+ return expiryPlc;
}
/**
@@ -1054,6 +1073,7 @@ public class GridCacheContext<K, V> implements Externalizable {
/**
* Gets thread local projection.
+ *
* @return Projection per call.
*/
public GridCacheProjectionImpl<K, V> projectionPerCall() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index e94016f..fc8aaaa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.dr.*;
import org.gridgain.grid.util.lang.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.util.*;
/**
@@ -390,7 +391,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
* @param writeThrough Write through flag.
* @param retval Return value flag.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param evt Event flag.
* @param metrics Metrics update flag.
* @param primary If update is performed on primary node (the one which assigns version).
@@ -422,7 +423,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
@Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
boolean primary,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 32c1485..b723d71 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import sun.misc.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -1615,6 +1616,38 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old));
}
+ /**
+ * @param expiryPlc Expiry policy.
+ * @param isNew {@code True} if entry is new.
+ * @return TTL.
+ */
+ private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) {
+ if (expiryPlc == null)
+ return -1L;
+
+ Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate();
+
+ return toTtl(duration);
+ }
+
+ private static long toTtl(Duration duration) {
+ if (duration == null)
+ return -1;
+
+ if (duration.getDurationAmount() == 0) {
+ if (duration.isEternal())
+ return 0;
+
+ assert duration.isZero();
+
+ return 1L;
+ }
+
+ assert duration.getTimeUnit() != null;
+
+ return duration.getTimeUnit().toMillis(duration.getDurationAmount());
+ }
+
/** {@inheritDoc} */
@Override public GridCacheUpdateAtomicResult<K, V> innerUpdate(
GridCacheVersion newVer,
@@ -1625,7 +1658,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
@Nullable byte[] valBytes,
boolean writeThrough,
boolean retval,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
boolean primary,
@@ -1668,7 +1701,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
Object transformClo = null;
if (drResolve) {
- drRes = cctx.dr().resolveAtomic(this, op, writeObj, valBytes, ttl, drTtl, drExpireTime, drVer);
+ drRes = cctx.dr().resolveAtomic(this,
+ op,
+ writeObj,
+ valBytes,
+ ttlFromPolicy(expiryPlc, isNew()),
+ drTtl,
+ drExpireTime,
+ drVer);
if (drRes != null) {
if (drRes.isUseOld()) {
@@ -1730,26 +1770,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
"Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']';
}
- if (drRes == null) {
- // Calculate TTL and expire time for local update.
- if (drTtl >= 0L) {
- assert drExpireTime >= 0L;
-
- newTtl = drTtl;
- newExpireTime = drExpireTime;
- }
- else {
- assert drExpireTime == -1L;
-
- newTtl = ttl;
-
- if (newTtl < 0)
- newTtl = ttlExtras();
-
- newExpireTime = toExpireTime(newTtl);
- }
- }
-
// Possibly get old value form store.
old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
@@ -1777,7 +1797,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (!F.isEmptyOrNulls(filter)) {
boolean pass = cctx.isAll(wrapFilterLocked(), filter);
- if (!pass)
+ if (!pass) {
+ if (!isNew() && expiryPlc != null) {
+ Duration duration = expiryPlc.getExpiryForAccess();
+
+ if (duration != null)
+ updateTtl(toTtl(duration));
+ }
+
return new GridCacheUpdateAtomicResult<>(false,
retval ? old : null,
null,
@@ -1786,6 +1813,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
null,
null,
false);
+ }
}
// Apply metrics.
@@ -1841,6 +1869,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
}
+ if (drRes == null) {
+ // Calculate TTL and expire time for local update.
+ if (drTtl >= 0L) {
+ assert drExpireTime >= 0L;
+
+ newTtl = drTtl;
+ newExpireTime = drExpireTime;
+ }
+ else {
+ assert drExpireTime == -1L;
+
+ if (expiryPlc != null) {
+ if (!hadVal) {
+ Duration duration = expiryPlc.getExpiryForCreation();
+
+ if (duration != null && duration.isZero())
+ return new GridCacheUpdateAtomicResult<>(false,
+ retval ? old : null,
+ null,
+ 0L,
+ -1L,
+ null,
+ null,
+ false);
+
+ newTtl = toTtl(duration);
+ }
+ else
+ newTtl = toTtl(expiryPlc.getExpiryForUpdate());
+ }
+ else
+ newTtl = -1L;
+
+ if (newTtl < 0)
+ newTtl = ttlExtras();
+
+ newExpireTime = toExpireTime(newTtl);
+ }
+ }
+
// Try write-through.
if (writeThrough)
// Must persist inside synchronization in non-tx mode.
@@ -1859,7 +1927,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
assert !deletedUnlocked() : "Invalid entry [entry=" + this +
", locNodeId=" + cctx.localNodeId() + ']';
- // Do not change size;
+ // Do not change size.
}
if (cctx.portableEnabled())
@@ -2393,6 +2461,28 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
/**
+ * @param ttl Time to live.
+ */
+ private void updateTtl(long ttl) {
+ assert Thread.holdsLock(this);
+
+ if (ttl == -1L)
+ return;
+
+ long expireTime = toExpireTime(ttl);
+
+ long oldExpireTime = expireTimeExtras();
+
+ if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+ cctx.ttl().removeTrackedEntry(this);
+
+ ttlAndExpireTimeExtras(ttl, expireTime);
+
+ if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+ cctx.ttl().addTrackedEntry(this);
+ }
+
+ /**
* @return {@code true} If value bytes should be stored.
*/
protected boolean isStoreValueBytes() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index a083805..5dd98b3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -1596,7 +1596,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
throw new IllegalArgumentException("Cache is not configured: " + name);
- return new IgniteCacheProxy<>(cache);
+ return new IgniteCacheProxy<>(cache.context(), cache, null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index d8fbdfe..2544eb9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -16,6 +16,7 @@ import org.gridgain.grid.cache.store.*;
import org.gridgain.grid.kernal.processors.cache.dr.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.util.*;
/**
@@ -380,4 +381,15 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @return Primary entry set.
*/
public Set<GridCacheEntry<K, V>> primaryEntrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter);
+
+ /**
+ * @return {@link ExpiryPolicy} associated with this projection.
+ */
+ public @Nullable ExpiryPolicy expiry();
+
+ /**
+ * @param plc {@link ExpiryPolicy} to associate with this projection.
+ * @return New projection based on this one, but with the specified expiry policy.
+ */
+ public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 67eb9e8..9a73e64 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -73,6 +74,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** */
private boolean keepPortable;
+ /** */
+ private ExpiryPolicy expiryPlc;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -95,7 +99,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Nullable IgnitePredicate<? super GridCacheEntry<K, V>> entryFilter,
@Nullable Set<GridCacheFlag> flags,
@Nullable UUID subjId,
- boolean keepPortable) {
+ boolean keepPortable,
+ @Nullable ExpiryPolicy expiryPlc) {
assert parent != null;
assert cctx != null;
@@ -125,6 +130,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
qry = new GridCacheQueriesImpl<>(cctx, this);
this.keepPortable = keepPortable;
+
+ this.expiryPlc = expiryPlc;
}
/**
@@ -367,8 +374,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) {
A.notNull(subjId, "subjId");
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter, flags, subjId, keepPortable);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ cctx,
+ noNullKvFilter.kvFilter,
+ noNullEntryFilter.entryFilter,
+ flags,
+ subjId,
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl<>(cctx, prj, prj);
}
@@ -415,7 +428,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
(IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter,
flags,
subjId,
- keepPortable);
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl((GridCacheContext<K1, V1>)cctx, prj, prj);
}
@@ -439,8 +453,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
}
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, kvFilter,
- noNullEntryFilter.entryFilter, flags, subjId, keepPortable);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ cctx,
+ kvFilter,
+ noNullEntryFilter.entryFilter,
+ flags,
+ subjId,
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl<>(cctx, prj, prj);
}
@@ -463,8 +483,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
}
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
- filter, flags, subjId, keepPortable);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ cctx,
+ noNullKvFilter.kvFilter,
+ filter,
+ flags,
+ subjId,
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl<>(cctx, prj, prj);
}
@@ -482,8 +508,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
res.addAll(EnumSet.copyOf(F.asList(flags)));
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter, res, subjId, keepPortable);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ cctx,
+ noNullKvFilter.kvFilter,
+ noNullEntryFilter.entryFilter,
+ res,
+ subjId,
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl<>(cctx, prj, prj);
}
@@ -500,8 +532,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
res.removeAll(EnumSet.copyOf(F.asList(flags)));
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter, res, subjId, keepPortable);
+ GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+ cctx,
+ noNullKvFilter.kvFilter,
+ noNullEntryFilter.entryFilter,
+ res,
+ subjId,
+ keepPortable,
+ expiryPlc);
return new GridCacheProxyImpl<>(cctx, prj, prj);
}
@@ -516,7 +554,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
(IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter,
flags,
subjId,
- true);
+ true,
+ expiryPlc);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)cctx, prj, prj);
}
@@ -1242,6 +1281,24 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
+ @Override public @Nullable ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ return new GridCacheProjectionImpl<>(
+ this,
+ cctx,
+ noNullKvFilter.kvFilter,
+ noNullEntryFilter.entryFilter,
+ flags,
+ subjId,
+ true,
+ plc);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(cctx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 8b6ade8..38cc2ca 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -1879,6 +1880,16 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
+ @Nullable @Override public ExpiryPolicy expiry() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheProxyImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
new file mode 100644
index 0000000..f7fe27a
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
@@ -0,0 +1,88 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import javax.cache.expiry.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
+ /** */
+ private ExpiryPolicy plc;
+
+ /** */
+ private static final byte CREATE_TTL_MASK = 0x01;
+
+ /** */
+ private static final byte UPDATE_TTL_MASK = 0x02;
+
+ /** */
+ private Duration forCreate;
+
+ /** */
+ private Duration forUpdate;
+
+ /**
+ * @param plc Expiry policy.
+ */
+ public GridCacheExpiryPolicy(ExpiryPolicy plc) {
+ assert plc != null;
+
+ this.plc = plc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ return forCreate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return forUpdate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ byte flags = 0;
+
+ Duration create = plc.getExpiryForCreation();
+
+ if (create != null)
+ flags |= CREATE_TTL_MASK;
+
+ Duration update = plc.getExpiryForUpdate();
+
+ if (update != null)
+ flags |= UPDATE_TTL_MASK;
+
+ out.writeByte(flags);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ byte flags = in.readByte();
+
+ if ((flags & CREATE_TTL_MASK) != 0)
+ forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+
+ if ((flags & UPDATE_TTL_MASK) != 0)
+ forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4991adb..fd2d98d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -33,6 +33,7 @@ import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import sun.misc.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -594,7 +595,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(GridSecurityPermission.CACHE_PUT);
- UUID subjId = ctx.subjectIdPerCall(null);
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+ UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
@@ -611,7 +614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retval,
rawRetval,
cached,
- ttl,
+ prj != null ? prj.expiry() : null,
filter,
subjId,
taskNameHash);
@@ -651,7 +654,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
- UUID subjId = ctx.subjectIdPerCall(null);
+ GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+ UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
@@ -667,7 +672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retval,
rawRetval,
cached,
- 0,
+ (filter != null && prj != null) ? prj.expiry() : null,
filter,
subjId,
taskNameHash);
@@ -897,7 +902,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean replicate = ctx.isDrEnabled();
- if (storeEnabled() && keys.size() > 1 && ctx.dr().receiveEnabled()) {
+ if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) {
// This method can only be used when there are no replicated entries in the batch.
UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver,
dhtFut, completionCb, replicate, taskName);
@@ -1023,6 +1028,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
if (!checkFilter(entry, req, res)) {
+ // TODO IGNITE-41 update TTL.
+
if (log.isDebugEnabled())
log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
", filter=" + Arrays.toString(req.filter()) + ", res=" + res + ']');
@@ -1284,6 +1291,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
K k = keys.get(i);
@@ -1331,7 +1340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
newValBytes,
primary && storeEnabled(),
req.returnValue(),
- req.ttl(),
+ expiry,
true,
true,
primary,
@@ -1372,14 +1381,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
transformC = (IgniteClosure<V, V>)writeVal;
if (!readersOnly)
- dhtFut.addWriteEntry(entry, updRes.newValue(), newValBytes, transformC,
- drExpireTime >= 0L ? ttl : -1L, drExpireTime, newDrVer, drExpireTime < 0L ? ttl : 0L);
+ dhtFut.addWriteEntry(entry,
+ updRes.newValue(),
+ newValBytes,
+ transformC,
+ drExpireTime >= 0L ? ttl : -1L,
+ drExpireTime,
+ newDrVer,
+ drExpireTime < 0L ? req.expiry() : null);
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes,
- transformC, drExpireTime < 0L ? ttl : 0L);
+ transformC, drExpireTime < 0L ? req.expiry() : null);
}
else {
+ // TODO IGNITE-41 ttl could be changed.
+
if (log.isDebugEnabled())
log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
"[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
@@ -1391,7 +1408,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
GridDrResolveResult<V> ctx = updRes.drResolveResult();
- res.nearTtl(updRes.newTtl());
+ // TODO IGNITE-41 dr ttl for near cache.
if (ctx != null && ctx.isMerge())
newValBytes = null;
@@ -1524,6 +1541,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry<K, V> entry = entries.get(i);
@@ -1562,12 +1581,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
false,
false,
- req.ttl(),
+ expiry,
true,
true,
primary,
ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
- req.filter(),
+ req.filter(), // TODO IGNITE-41 filter already evaluated?
replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
-1L,
-1L,
@@ -1605,11 +1624,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
if (!batchRes.readersOnly())
- dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.ttl());
+ dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry());
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC,
- req.ttl());
+ req.expiry());
}
if (hasNear) {
@@ -1625,8 +1644,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearValue(idx, writeVal, valBytes);
}
- res.nearTtl(req.ttl());
-
if (writeVal != null || !entry.valueBytes().isNull()) {
IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -1861,9 +1878,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
drPutVals = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- Long ttl = req.drTtl(i);
+ long ttl = req.drTtl(i);
- if (ttl == null)
+ if (ttl == -1L)
drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.drVersion(i)));
else
drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.drVersion(i), ttl,
@@ -1894,7 +1911,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.returnValue(),
false,
null,
- req.ttl(),
+ req.expiry(),
req.filter(),
req.subjectId(),
req.taskNameHash());
@@ -2020,6 +2037,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
for (int i = 0; i < req.size(); i++) {
K key = req.key(i);
@@ -2048,7 +2067,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- req.ttl(),
+ expiry,
/*event*/true,
/*metrics*/true,
/*primary*/false,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 982c777..3c7da7b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -201,10 +202,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param drTtl DR TTL (optional).
* @param drExpireTime DR expire time (optional).
* @param drVer DR version (optional).
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
*/
- public void addWriteEntry(GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer, long ttl) {
+ public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ IgniteClosure<V, V> transformC,
+ long drTtl,
+ long drExpireTime,
+ @Nullable GridCacheVersion drVer,
+ @Nullable ExpiryPolicy expiryPlc) {
long topVer = updateReq.topologyVersion();
Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -230,7 +237,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
writeVer,
syncMode,
topVer,
- ttl,
+ expiryPlc,
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash());
@@ -249,10 +256,14 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param entry Entry.
* @param val Value.
* @param valBytes Value bytes.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy..
*/
- public void addNearWriteEntries(Iterable<UUID> readers, GridDhtCacheEntry<K, V> entry, @Nullable V val,
- @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, long ttl) {
+ public void addNearWriteEntries(Iterable<UUID> readers,
+ GridDhtCacheEntry<K, V> entry,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ IgniteClosure<V, V> transformC,
+ @Nullable ExpiryPolicy expiryPlc) {
GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
keys.add(entry.key());
@@ -276,7 +287,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
writeVer,
syncMode,
topVer,
- ttl,
+ expiryPlc,
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index ad3f8da..c3b0918 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,6 +20,7 @@ import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -79,8 +80,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
/** Write synchronization mode. */
private GridCacheWriteSynchronizationMode syncMode;
- /** Time to live. */
- private long ttl;
+ /** Expiry policy. */
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
/** Keys to update. */
@GridToStringInclude
@@ -150,7 +154,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param writeVer Write version for cache values.
* @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param forceTransformBackups Force transform backups flag.
* @param subjId Subject ID.
*/
@@ -161,7 +165,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
GridCacheVersion writeVer,
GridCacheWriteSynchronizationMode syncMode,
long topVer,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
boolean forceTransformBackups,
UUID subjId,
int taskNameHash
@@ -171,7 +175,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
this.futVer = futVer;
this.writeVer = writeVer;
this.syncMode = syncMode;
- this.ttl = ttl;
+ this.expiryPlc = expiryPlc;
this.topVer = topVer;
this.forceTransformBackups = forceTransformBackups;
this.subjId = subjId;
@@ -360,10 +364,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
/**
- * @return Time to live.
+ * @return Expiry policy.
*/
- public long ttl() {
- return ttl;
+ @Nullable public ExpiryPolicy expiry() {
+ return expiryPlc;
}
/**
@@ -621,7 +625,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.drTtls = drTtls;
_clone.drExpireTimes = drExpireTimes;
_clone.syncMode = syncMode;
- _clone.ttl = ttl;
+ _clone.expiryPlc = expiryPlc;
_clone.nearKeys = nearKeys;
_clone.nearKeyBytes = nearKeyBytes;
_clone.nearVals = nearVals;
@@ -742,7 +746,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 11:
- if (!commState.putLong(ttl))
+ if (!commState.putByteArray(expiryPlcBytes))
return false;
commState.idx++;
@@ -1037,10 +1041,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 11:
- if (buf.remaining() < 8)
+ byte[] expiryPlcBytes0 = commState.getByteArray();
+
+ if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
return false;
- ttl = commState.getLong();
+ expiryPlcBytes = expiryPlcBytes0;
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d660112..4750462 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -93,8 +94,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
/** Cached entry if keys size is 1. */
private GridCacheEntryEx<K, V> cached;
- /** Time to live. */
- private final long ttl;
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
/** Future map topology version. */
private long topVer;
@@ -141,7 +142,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
futVer = null;
retval = false;
fastMap = false;
- ttl = 0;
+ expiryPlc = null;
filter = null;
syncMode = null;
op = null;
@@ -162,7 +163,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
* @param retval Return value require flag.
* @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
* @param cached Cached entry if keys size is 1.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param filter Entry filter.
*/
public GridNearAtomicUpdateFuture(
@@ -177,7 +178,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx<K, V> cached,
- long ttl,
+ @Nullable ExpiryPolicy expiryPlc,
final IgnitePredicate<GridCacheEntry<K, V>>[] filter,
UUID subjId,
int taskNameHash
@@ -201,7 +202,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
this.drRmvVals = drRmvVals;
this.retval = retval;
this.cached = cached;
- this.ttl = ttl;
+ this.expiryPlc = expiryPlc;
this.filter = filter;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -556,7 +557,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
op,
retval,
op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
- ttl,
+ expiryPlc,
filter,
subjId,
taskNameHash);
@@ -662,7 +663,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
op,
retval,
op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
- ttl,
+ expiryPlc,
filter,
subjId,
taskNameHash);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 00512ff..3eca7e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -14,12 +14,14 @@ import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -88,8 +90,11 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
/** Return value flag. */
private boolean retval;
- /** Time to live. */
- private long ttl;
+ /** Expiry policy. */
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
/** Filter. */
@GridDirectTransient
@@ -132,7 +137,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
* @param syncMode Synchronization mode.
* @param op Cache update operation.
* @param retval Return value required flag.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param filter Optional filter for atomic check.
*/
public GridNearAtomicUpdateRequest(
@@ -146,7 +151,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
GridCacheOperation op,
boolean retval,
boolean forceTransformBackups,
- long ttl,
+ ExpiryPolicy expiryPlc,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@Nullable UUID subjId,
int taskNameHash
@@ -162,7 +167,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
this.op = op;
this.retval = retval;
this.forceTransformBackups = forceTransformBackups;
- this.ttl = ttl;
+ this.expiryPlc = expiryPlc;
this.filter = filter;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
@@ -240,10 +245,10 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
}
/**
- * @return Time to live.
+ * @return Expiry policy.
*/
- public long ttl() {
- return ttl;
+ public ExpiryPolicy expiry() {
+ return expiryPlc;
}
/**
@@ -485,6 +490,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
keyBytes = marshalCollection(keys, ctx);
valBytes = marshalValuesCollection(vals, ctx);
filterBytes = marshalFilter(filter, ctx);
+
+ if (expiryPlc != null)
+ expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc));
}
/** {@inheritDoc} */
@@ -494,6 +502,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
keys = unmarshalCollection(keyBytes, ctx, ldr);
vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
filter = unmarshalFilter(filterBytes, ctx, ldr);
+
+ if (expiryPlcBytes != null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
}
/** {@inheritDoc} */
@@ -527,7 +538,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
_clone.drTtls = drTtls;
_clone.drExpireTimes = drExpireTimes;
_clone.retval = retval;
- _clone.ttl = ttl;
+ _clone.expiryPlc = expiryPlc;
_clone.filter = filter;
_clone.filterBytes = filterBytes;
_clone.hasPrimary = hasPrimary;
@@ -688,7 +699,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
commState.idx++;
case 15:
- if (!commState.putLong(ttl))
+ if (!commState.putByteArray(expiryPlcBytes))
return false;
commState.idx++;
@@ -928,10 +939,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
commState.idx++;
case 15:
- if (buf.remaining() < 8)
+ byte[] expiryPlcBytes0 = commState.getByteArray();
+
+ if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
return false;
- ttl = commState.getLong();
+ expiryPlcBytes = expiryPlcBytes0;
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index fe16214..fd4d7dc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -94,10 +94,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
@GridDirectVersion(1)
private GridCacheVersion nearVer;
- /** Ttl to be used for originating node's near cache update. */
- @GridDirectVersion(1)
- private long nearTtl;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -204,20 +200,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
}
/**
- * @param ttl Time to live to be used for originating node's near cache update.
- */
- public void nearTtl(long ttl) {
- nearTtl = ttl;
- }
-
- /**
- * @return Time to live to be used for originating node's near cache update.
- */
- public long nearTtl() {
- return nearTtl;
- }
-
- /**
* @param nearVer Version generated on primary node to be used for originating node's near cache update.
*/
public void nearVersion(GridCacheVersion nearVer) {
@@ -384,7 +366,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
_clone.nearVals = nearVals;
_clone.nearValBytes = nearValBytes;
_clone.nearVer = nearVer;
- _clone.nearTtl = nearTtl;
}
/** {@inheritDoc} */
@@ -461,12 +442,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
case 9:
- if (!commState.putLong(nearTtl))
- return false;
-
- commState.idx++;
-
- case 10:
if (nearValBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearValBytes.size()))
@@ -493,7 +468,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
- case 11:
+ case 10:
if (nearValsIdxs != null) {
if (commState.it == null) {
if (!commState.putInt(nearValsIdxs.size()))
@@ -520,7 +495,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
- case 12:
+ case 11:
if (!commState.putCacheVersion(nearVer))
return false;
@@ -620,14 +595,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
case 9:
- if (buf.remaining() < 8)
- return false;
-
- nearTtl = commState.getLong();
-
- commState.idx++;
-
- case 10:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -656,7 +623,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
- case 11:
+ case 10:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -685,7 +652,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
commState.idx++;
- case 12:
+ case 11:
GridCacheVersion nearVer0 = commState.getCacheVersion();
if (nearVer0 == CACHE_VER_NOT_READ)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index ce5e19c..2aa32c3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
@@ -148,7 +149,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
try {
- processNearAtomicUpdateResponse(ver, key, val, valBytes, res.nearTtl(), req.nodeId(), req.subjectId(),
+ processNearAtomicUpdateResponse(ver,
+ key,
+ val,
+ valBytes,
+ req.expiry(),
+ req.nodeId(),
+ req.subjectId(),
taskName);
}
catch (IgniteCheckedException e) {
@@ -162,7 +169,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param key Key.
* @param val Value.
* @param valBytes Value bytes.
- * @param ttl Time to live.
+ * @param expiryPlc Expiry policy.
* @param nodeId Node ID.
* @throws IgniteCheckedException If failed.
*/
@@ -171,7 +178,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
K key,
@Nullable V val,
@Nullable byte[] valBytes,
- Long ttl,
+ ExpiryPolicy expiryPlc,
UUID nodeId,
UUID subjId,
String taskName
@@ -196,7 +203,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- ttl,
+ expiryPlc != null ? expiryPlc : ctx.expiry(),
/*event*/true,
/*metrics*/true,
/*primary*/false,
@@ -253,6 +260,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
for (int i = 0; i < req.nearSize(); i++) {
K key = req.nearKey(i);
@@ -292,7 +301,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
valBytes,
/*write-through*/false,
/*retval*/false,
- req.ttl(),
+ expiry,
/*event*/true,
/*metrics*/true,
/*primary*/false,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
index 88abfc4..ef0b4c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
@@ -46,6 +46,8 @@ public class IgniteCacheTest extends GridCommonAbstractTest {
assert cnt >= 1 : "At least one grid must be started";
startGridsMultiThreaded(cnt);
+
+ awaitPartitionMapExchange();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
index f72619b..0d22f62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
@@ -10,15 +10,25 @@
package org.apache.ignite.internal.processors.cache.expiry;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.processors.cache.*;
import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
import javax.cache.configuration.*;
import javax.cache.expiry.*;
+import java.util.*;
import java.util.concurrent.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
/**
*
*/
@@ -36,59 +46,82 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
stopAllGrids();
}
- @Override
- protected int gridCount() {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
return 2;
}
/**
- *
+ * @throws Exception If failed.
*/
- private class TestCreatedPolicy implements ExpiryPolicy {
- /** */
- private final Duration duration;
+ public void testCreated() throws Exception {
+ factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
- /**
- * @param ttl TTL for creation.
- */
- TestCreatedPolicy(long ttl) {
- this.duration = new Duration(TimeUnit.MILLISECONDS, ttl);
- }
+ startGrids();
- /** {@inheritDoc} */
- @Override public Duration getExpiryForCreation() {
- return duration;
- }
+ Collection<Integer> keys = keys();
- /** {@inheritDoc} */
- @Override public Duration getExpiryForAccess() {
- return null;
- }
+ IgniteCache<Integer, Integer> cache = jcache(0);
- /** {@inheritDoc} */
- @Override public Duration getExpiryForUpdate() {
- return null;
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ cache.put(key, 1);
+
+ checkTtl(key, 60_000);
+
+ for (int i = 0; i < gridCount(); i++) {
+ assertEquals((Integer)1, cache.get(key));
+
+ checkTtl(key, 60_000);
+ }
+
+ cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 2); // Update, should not change TTL.
+
+ checkTtl(key, 60_000);
+
+ assertEquals((Integer)2, cache.get(key));
+
+ assertTrue(cache.remove(key));
+
+ cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 3); // Create with provided TTL.
+
+ checkTtl(key, 1000);
+
+ waitExpired(key);
}
}
/**
+ * @return Test keys.
* @throws Exception If failed.
*/
- public void testCreated() throws Exception {
- factory = new FactoryBuilder.SingletonFactory<>(new TestCreatedPolicy(1000));
+ private Collection<Integer> keys() throws Exception {
+ GridCache<Integer, Object> cache = cache(0);
- startGrids();
+ Collection<Integer> keys = new ArrayList<>();
- final Integer key = 1;
+ keys.add(primaryKey(cache));
- IgniteCache<Integer, Integer> cache = jcache(0);
+ if (gridCount() > 1) {
+ keys.add(backupKey(cache));
+
+ if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED)
+ keys.add(nearKey(cache));
+ }
- cache.put(1, 1);
+ return keys;
+ }
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void waitExpired(final Integer key) throws Exception {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
for (int i = 0; i < gridCount(); i++) {
- Object val = cache(i).peek(key);
+ Object val = jcache(i).localPeek(key);
log.info("Value [grid=" + i + ", val=" + val + ']');
@@ -98,10 +131,50 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
return false;
}
- }, 2000);
+ }, 3000);
+
+ GridCache<Integer, Object> cache = cache(0);
+
+ for (int i = 0; i < gridCount(); i++) {
+ ClusterNode node = grid(i).cluster().localNode();
+
+ Object val = jcache(i).localPeek(key);
+
+ log.info("Value [grid=" + i +
+ ", primary=" + cache.affinity().isPrimary(node, key) +
+ ", backup=" + cache.affinity().isBackup(node, key) + ']');
+
+ assertNull("Unexpected non-null value for grid " + i, val);
+ }
for (int i = 0; i < gridCount(); i++)
- assertNull("Unexpected non-null value for grid " + i, cache.get(key));
+ assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+ }
+
+ /**
+ * @param key Key.
+ * @param ttl TTL.
+ * @throws Exception If failed.
+ */
+ private void checkTtl(Object key, long ttl) throws Exception {
+ for (int i = 0; i < gridCount(); i++) {
+ GridKernal grid = (GridKernal)grid(i);
+
+ GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
+
+ GridCacheEntryEx<Object, Object> e = cache.peekEx(key);
+
+ if (e == null && cache.context().isNear())
+ e = cache.context().near().dht().peekEx(key);
+
+ if (e == null) {
+ assertTrue(i > 0);
+
+ assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+ }
+ else
+ assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
+ }
}
/** {@inheritDoc} */
@@ -110,11 +183,61 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
- cfg.setCacheMode(GridCacheMode.PARTITIONED);
- cfg.setAtomicityMode(GridCacheAtomicityMode.ATOMIC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setBackups(1);
+
+ cfg.setDistributionMode(PARTITIONED_ONLY);
cfg.setExpiryPolicyFactory(factory);
return cfg;
}
+
+ /**
+ *
+ */
+ private class TestPolicy implements ExpiryPolicy {
+ /** */
+ private Long create;
+
+ /** */
+ private Long access;
+
+ /** */
+ private Long update;
+
+ /**
+ * @param create TTL for creation.
+ * @param access TTL for access.
+ * @param update TTL for update.
+ */
+ TestPolicy(@Nullable Long create,
+ @Nullable Long access,
+ @Nullable Long update) {
+ this.create = create;
+ this.access = access;
+ this.update = update;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestPolicy.class, this);
+ }
+ }
}