You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:22:46 UTC
[10/38] incubator-ignite git commit: # ignite-41
# ignite-41
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/18b5b5a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/18b5b5a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/18b5b5a9
Branch: refs/heads/ignite-1
Commit: 18b5b5a93159e2a45e558083f7a975019d71fc87
Parents: 7642979
Author: sboikov <se...@inria.fr>
Authored: Thu Dec 18 07:27:44 2014 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu Dec 18 07:27:44 2014 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 6 ++++
.../processors/cache/GridCacheMapEntry.java | 13 +++++++++
.../distributed/GridCacheTtlUpdateRequest.java | 27 ++++++++++++++++++
.../distributed/dht/GridDhtCacheAdapter.java | 22 ++++++++++++++-
.../dht/GridPartitionedGetFuture.java | 12 +++++---
.../dht/atomic/GridDhtAtomicCache.java | 10 ++++---
.../dht/colocated/GridDhtColocatedCache.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 3 +-
.../distributed/near/GridNearGetRequest.java | 21 +++++++++++++-
.../IgniteCacheExpiryPolicyAbstractTest.java | 29 +++++++++++++++++++-
.../processors/cache/GridCacheTestEntryEx.java | 5 ++++
11 files changed, 138 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 2b38247..c6e3ea6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -873,6 +873,12 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
public long ttl() throws GridCacheEntryRemovedException;
/**
+ * @param ver Version.
+ * @param ttl Time to live.
+ */
+ public void updateTtl(GridCacheVersion ver, long ttl);
+
+ /**
* @return Value.
* @throws IgniteCheckedException If failed to read from swap storage.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 00f7382..e4ccc11 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -3428,6 +3428,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
/** {@inheritDoc} */
+ @Override public void updateTtl(GridCacheVersion ver, long ttl) {
+ synchronized (this) {
+ try {
+ if (ver.equals(version()))
+ updateTtl(ttl);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException {
checkObsolete();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 71e314e..c862904 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -56,6 +56,13 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
}
/**
+ * @return TTL.
+ */
+ public long ttl() {
+ return ttl;
+ }
+
+ /**
* @param key Key.
* @param keyBytes Key bytes.
* @param ver Version.
@@ -83,6 +90,26 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
return keys;
}
+ /**
+ * @param idx Entry index.
+ * @return Key.
+ */
+ public K key(int idx) {
+ assert idx >= 0 && idx < keys.size() : idx;
+
+ return keys.get(idx);
+ }
+
+ /**
+ * @param idx Entry index.
+ * @return Version.
+ */
+ public GridCacheVersion version(int idx) {
+ assert idx >= 0 && idx < vers.size() : idx;
+
+ return vers.get(idx);
+ }
+
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 3557d17..328a8d5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -106,7 +106,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param req Request.
*/
private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
- log.info("Ttl update: " + req);
+ int size = req.keys().size();
+
+ for (int i = 0; i < size; i++) {
+ try {
+ GridCacheEntryEx<K, V> entry;
+
+ if (ctx.isSwapOrOffheapEnabled()) {
+ entry = ctx.cache().entryEx(req.key(i), true);
+
+ entry.unswap(true, false);
+ }
+ else
+ entry = ctx.cache().peekEx(req.key(i));
+
+ if (entry != null)
+ entry.updateTtl(req.version(i), req.ttl());
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to unswap entry.", e);
+ }
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 5f6af05..34539d4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -89,6 +88,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
/** Whether to deserialize portable objects. */
private boolean deserializePortable;
+ /** */
+ private GridCacheAccessExpiryPolicy expiry;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -114,11 +116,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
@Nullable UUID subjId,
String taskName,
- boolean deserializePortable
+ boolean deserializePortable,
+ @Nullable GridCacheAccessExpiryPolicy expiry
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
- assert cctx != null;
assert !F.isEmpty(keys);
this.cctx = cctx;
@@ -130,6 +132,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
this.subjId = subjId;
this.deserializePortable = deserializePortable;
this.taskName = taskName;
+ this.expiry = expiry;
futId = IgniteUuid.randomUuid();
@@ -352,7 +355,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
topVer,
filters,
subjId,
- taskName == null ? 0 : taskName.hashCode());
+ taskName == null ? 0 : taskName.hashCode(),
+ expiry != null ? expiry.ttl() : -1L);
add(fut); // Append new future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6287e16..1d49097 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -729,15 +729,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
long topVer = ctx.affinity().affinityTopologyVersion();
+ final GridCacheAccessExpiryPolicy expiry =
+ GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!reload && !forcePrimary) {
Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
boolean success = true;
- final GridCacheAccessExpiryPolicy expiry =
- GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
-
// Optimistically expect that all keys are available locally (avoid creation of get future).
for (K key : keys) {
GridCacheEntryEx<K, V> entry = null;
@@ -819,6 +819,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridCacheTtlUpdateRequest<K, V> req = expiry.request();
+ assert req != null;
assert !F.isEmpty(req.keys());
Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1);
@@ -847,7 +848,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filter,
subjId,
taskName,
- deserializePortable);
+ deserializePortable,
+ expiry);
fut.init();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 1052e1d..84f3165 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -319,7 +319,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
filter,
subjId,
taskName,
- deserializePortable);
+ deserializePortable,
+ null);
fut.init();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index 1f1de06..d23236a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -350,7 +350,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
topVer,
filters,
subjId,
- taskName == null ? 0 : taskName.hashCode());
+ taskName == null ? 0 : taskName.hashCode(),
+ -1L);
add(fut); // Append new future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
index 12aacb7..552012b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -71,6 +71,9 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
@GridDirectVersion(2)
private int taskNameHash;
+ /** */
+ private long accessTtl;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -98,7 +101,8 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
long topVer,
IgnitePredicate<GridCacheEntry<K, V>>[] filter,
UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ long accessTtl
) {
assert futId != null;
assert miniId != null;
@@ -115,6 +119,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
this.filter = filter;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.accessTtl = accessTtl;
}
/**
@@ -364,6 +369,12 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
commState.idx++;
+ case 12:
+ if (!commState.putLong(accessTtl))
+ return false;
+
+ commState.idx++;
+
}
return true;
@@ -513,6 +524,14 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
commState.idx++;
+ case 12:
+ if (buf.remaining() < 8)
+ return false;
+
+ accessTtl = commState.getLong();
+
+ commState.idx++;
+
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 93b0405..5ab6033 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -16,6 +16,7 @@ import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
import org.jetbrains.annotations.*;
@@ -150,7 +151,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
assertEquals((Integer)1, cache.get(key));
- checkTtl(key, 62_000L);
+ checkTtl(key, 62_000L, true);
}
/**
@@ -578,6 +579,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
* @throws Exception If failed.
*/
private void checkTtl(Object key, long ttl) throws Exception {
+ checkTtl(key, ttl, false);
+ }
+
+ /**
+ * @param key Key.
+ * @param ttl TTL.
+ * @throws Exception If failed.
+ */
+ private void checkTtl(Object key, final long ttl, boolean wait) throws Exception {
boolean found = false;
for (int i = 0; i < gridCount(); i++) {
@@ -595,6 +605,23 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
else {
found = true;
+ if (wait) {
+ final GridCacheEntryEx<Object, Object> e0 = e;
+
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ try {
+ return e0.ttl() == ttl;
+ }
+ catch (Exception e) {
+ fail("Unexpected error: " + e);
+
+ return true;
+ }
+ }
+ }, 3000);
+ }
+
assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
if (ttl > 0)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index bbce8ca..6a21fe7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -763,6 +763,11 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
return ttl;
}
+ /** @inheritDoc */
+ @Override public void updateTtl(GridCacheVersion ver, long ttl) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override public V unswap() throws IgniteCheckedException {
return null;