You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/23 09:22:46 UTC

[10/38] incubator-ignite git commit: # ignite-41

# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/18b5b5a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/18b5b5a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/18b5b5a9

Branch: refs/heads/ignite-1
Commit: 18b5b5a93159e2a45e558083f7a975019d71fc87
Parents: 7642979
Author: sboikov <se...@inria.fr>
Authored: Thu Dec 18 07:27:44 2014 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu Dec 18 07:27:44 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  6 ++++
 .../processors/cache/GridCacheMapEntry.java     | 13 +++++++++
 .../distributed/GridCacheTtlUpdateRequest.java  | 27 ++++++++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    | 22 ++++++++++++++-
 .../dht/GridPartitionedGetFuture.java           | 12 +++++---
 .../dht/atomic/GridDhtAtomicCache.java          | 10 ++++---
 .../dht/colocated/GridDhtColocatedCache.java    |  3 +-
 .../distributed/near/GridNearGetFuture.java     |  3 +-
 .../distributed/near/GridNearGetRequest.java    | 21 +++++++++++++-
 .../IgniteCacheExpiryPolicyAbstractTest.java    | 29 +++++++++++++++++++-
 .../processors/cache/GridCacheTestEntryEx.java  |  5 ++++
 11 files changed, 138 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 2b38247..c6e3ea6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -873,6 +873,12 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
     public long ttl() throws GridCacheEntryRemovedException;
 
     /**
+     * @param ver Version.
+     * @param ttl Time to live.
+     */
+    public void updateTtl(GridCacheVersion ver, long ttl);
+
+    /**
      * @return Value.
      * @throws IgniteCheckedException If failed to read from swap storage.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 00f7382..e4ccc11 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -3428,6 +3428,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
+    @Override public void updateTtl(GridCacheVersion ver, long ttl) {
+        synchronized (this) {
+            try {
+                if (ver.equals(version()))
+                    updateTtl(ttl);
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException {
         checkObsolete();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 71e314e..c862904 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -56,6 +56,13 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
     }
 
     /**
+     * @return TTL.
+     */
+    public long ttl() {
+        return ttl;
+    }
+
+    /**
      * @param key Key.
      * @param keyBytes Key bytes.
      * @param ver Version.
@@ -83,6 +90,26 @@ public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
         return keys;
     }
 
+    /**
+     * @param idx Entry index.
+     * @return Key.
+     */
+    public K key(int idx) {
+        assert idx >= 0 && idx < keys.size() : idx;
+
+        return keys.get(idx);
+    }
+
+    /**
+     * @param idx Entry index.
+     * @return Version.
+     */
+    public GridCacheVersion version(int idx) {
+        assert idx >= 0 && idx < vers.size() : idx;
+
+        return vers.get(idx);
+    }
+
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
         throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 3557d17..328a8d5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -106,7 +106,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param req Request.
      */
     private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
-        log.info("Ttl update: " + req);
+        int size = req.keys().size();
+
+        for (int i = 0; i < size; i++) {
+            try {
+                GridCacheEntryEx<K, V> entry;
+
+                if (ctx.isSwapOrOffheapEnabled()) {
+                    entry = ctx.cache().entryEx(req.key(i), true);
+
+                    entry.unswap(true, false);
+                }
+                else
+                    entry = ctx.cache().peekEx(req.key(i));
+
+                if (entry != null)
+                    entry.updateTtl(req.version(i), req.ttl());
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to unswap entry.", e);
+            }
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 5f6af05..34539d4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -89,6 +88,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     /** Whether to deserialize portable objects. */
     private boolean deserializePortable;
 
+    /** */
+    private GridCacheAccessExpiryPolicy expiry;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -114,11 +116,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
         @Nullable UUID subjId,
         String taskName,
-        boolean deserializePortable
+        boolean deserializePortable,
+        @Nullable GridCacheAccessExpiryPolicy expiry
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
-        assert cctx != null;
         assert !F.isEmpty(keys);
 
         this.cctx = cctx;
@@ -130,6 +132,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         this.subjId = subjId;
         this.deserializePortable = deserializePortable;
         this.taskName = taskName;
+        this.expiry = expiry;
 
         futId = IgniteUuid.randomUuid();
 
@@ -352,7 +355,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     topVer,
                     filters,
                     subjId,
-                    taskName == null ? 0 : taskName.hashCode());
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiry != null ? expiry.ttl() : -1L);
 
                 add(fut); // Append new future.
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6287e16..1d49097 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -729,15 +729,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         long topVer = ctx.affinity().affinityTopologyVersion();
 
+        final GridCacheAccessExpiryPolicy expiry =
+            GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!reload && !forcePrimary) {
             Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
 
             boolean success = true;
 
-            final GridCacheAccessExpiryPolicy expiry =
-                GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
-
             // Optimistically expect that all keys are available locally (avoid creation of get future).
             for (K key : keys) {
                 GridCacheEntryEx<K, V> entry = null;
@@ -819,6 +819,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             try {
                                 GridCacheTtlUpdateRequest<K, V> req = expiry.request();
 
+                                assert req != null;
                                 assert !F.isEmpty(req.keys());
 
                                 Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1);
@@ -847,7 +848,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             filter,
             subjId,
             taskName,
-            deserializePortable);
+            deserializePortable,
+            expiry);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 1052e1d..84f3165 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -319,7 +319,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             filter,
             subjId,
             taskName,
-            deserializePortable);
+            deserializePortable,
+            null);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index 1f1de06..d23236a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -350,7 +350,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     topVer,
                     filters,
                     subjId,
-                    taskName == null ? 0 : taskName.hashCode());
+                    taskName == null ? 0 : taskName.hashCode(),
+                    -1L);
 
                 add(fut); // Append new future.
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
index 12aacb7..552012b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -71,6 +71,9 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
     @GridDirectVersion(2)
     private int taskNameHash;
 
+    /** */
+    private long accessTtl;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -98,7 +101,8 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
         long topVer,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
         UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        long accessTtl
     ) {
         assert futId != null;
         assert miniId != null;
@@ -115,6 +119,7 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
         this.filter = filter;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.accessTtl = accessTtl;
     }
 
     /**
@@ -364,6 +369,12 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
 
                 commState.idx++;
 
+            case 12:
+                if (!commState.putLong(accessTtl))
+                    return false;
+
+                commState.idx++;
+
         }
 
         return true;
@@ -513,6 +524,14 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
 
                 commState.idx++;
 
+            case 12:
+                if (buf.remaining() < 8)
+                    return false;
+
+                accessTtl = commState.getLong();
+
+                commState.idx++;
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 93b0405..5ab6033 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -16,6 +16,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
@@ -150,7 +151,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         assertEquals((Integer)1, cache.get(key));
 
-        checkTtl(key, 62_000L);
+        checkTtl(key, 62_000L, true);
     }
 
     /**
@@ -578,6 +579,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
      * @throws Exception If failed.
      */
     private void checkTtl(Object key, long ttl) throws Exception {
+        checkTtl(key, ttl, false);
+    }
+
+    /**
+     * @param key Key.
+     * @param ttl TTL.
+     * @throws Exception If failed.
+     */
+    private void checkTtl(Object key, final long ttl, boolean wait) throws Exception {
         boolean found = false;
 
         for (int i = 0; i < gridCount(); i++) {
@@ -595,6 +605,23 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
             else {
                 found = true;
 
+                if (wait) {
+                    final GridCacheEntryEx<Object, Object> e0 = e;
+
+                    GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            try {
+                                return e0.ttl() == ttl;
+                            }
+                            catch (Exception e) {
+                                fail("Unexpected error: " + e);
+
+                                return true;
+                            }
+                        }
+                    }, 3000);
+                }
+
                 assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
 
                 if (ttl > 0)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/18b5b5a9/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index bbce8ca..6a21fe7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -763,6 +763,11 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
         return ttl;
     }
 
+    /** @inheritDoc */
+    @Override public void updateTtl(GridCacheVersion ver, long ttl) {
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @Override public V unswap() throws IgniteCheckedException {
         return null;