You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/16 15:50:17 UTC

incubator-ignite git commit: IGNITE-54 Implemented removal of primary entries on each node

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-54 [created] 3344e7271


IGNITE-54
Implemented removal of primary entries on each node


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3344e727
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3344e727
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3344e727

Branch: refs/heads/ignite-54
Commit: 3344e7271f20ec91a8b17d26a3d6455f054e6a3d
Parents: 6efcfb2
Author: avinogradov <av...@gridgain.com>
Authored: Fri Jan 16 17:49:46 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Fri Jan 16 17:49:46 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  15 ++-
 .../grid/cache/GridCacheProjection.java         |  17 +++
 .../processors/cache/GridCacheAdapter.java      | 134 +++++++++++++++++--
 .../cache/GridCacheProjectionImpl.java          |   5 +
 .../processors/cache/GridCacheProxyImpl.java    |  12 ++
 .../processors/cache/IgniteCacheTest.java       |  20 +++
 6 files changed, 189 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index ab45aea..2533d34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -458,8 +458,19 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public void removeAll() {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        try {
+            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+
+            try {
+                delegate.removeAll();
+            }
+            finally {
+                gate.leave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
index d188b71..2a84fbe 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
@@ -1235,6 +1235,23 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V>
     public Set<K> keySet();
 
     /**
+     * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements
+     * to this set. All removal operation will be reflected on the cache itself.
+     * <p>
+     * Iterator over this set will not fail if set was concurrently updated
+     * by another thread. This means that iterator may or may not return latest
+     * keys depending on whether they were added before or after current
+     * iterator position.
+     * <p>
+     * NOTE: this operation is not distributed and returns only the keys cached on this node.
+     *
+     * @param filter Optional filter to check prior to getting key form cache. Note
+     * that filter is checked atomically together with get operation.
+     * @return Key set for this cache projection.
+     */
+    public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) ;
+
+    /**
      * Set of keys for which this node is primary.
      * This set is dynamic and may change with grid topology changes.
      * Note that this set will contain mappings for all keys, even if their values are

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 97d914a..ffd015f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     /** clearAll() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
+    /** removeAll() batch size. */
+    private static final long REMOVE_ALL_BATCH_SIZE = 100L;
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
                 String>>() {
@@ -2994,22 +2997,38 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
 
     /** {@inheritDoc} */
     @Override public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {
-        ctx.denyOnLocalRead();
+        try {
+            if (F.isEmptyOrNulls(filter))
+                filter = ctx.trueArray();
 
-        if (F.isEmptyOrNulls(filter))
-            filter = ctx.trueArray();
+            long topVer;
 
-        final IgnitePredicate<GridCacheEntry<K, V>>[] p = filter;
+            do {
+                topVer = ctx.affinity().affinityTopologyVersion();
 
-        syncOp(new SyncInOp(false) {
-            @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException {
-                tx.removeAllAsync(ctx, keySet(p), null, false, null).get();
-            }
+                // Send job to all nodes.
+                Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();
 
-            @Override public String toString() {
-                return "removeAll [filter=" + Arrays.toString(p) + ']';
-            }
-        });
+                IgniteFuture<Object> fut = null;
+
+                if (!nodes.isEmpty())
+                    fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true);
+
+                if (fut != null)
+                    fut.get();
+
+            } while (ctx.affinity().affinityTopologyVersion() > topVer);
+        }
+        catch (ClusterGroupEmptyException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]");
+        }
+        catch (ComputeTaskTimeoutException e) {
+            U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " +
+                    "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+            throw e;
+        }
     }
 
     /** {@inheritDoc} */
@@ -4670,6 +4689,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     }
 
     /**
+     * Internal callable which performs remove all primary key mappings
+     * operation on a cache with the given name.
+     */
+    @GridInternal
+    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache name. */
+        private String cacheName;
+
+        /** Topology version. */
+        private long topVer;
+
+        /** Remove batch size. */
+        private long rmvBatchSz;
+
+        /** Filters. */
+        private IgnitePredicate<GridCacheEntry<K, V>>[] filter;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public GlobalRemoveAllCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         * @param rmvBatchSz Remove batch size.
+         * @param filter Filter.
+         */
+        private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<GridCacheEntry<K, V>> ... filter) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.rmvBatchSz = rmvBatchSz;
+            this.filter = filter;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public Object call() throws Exception {
+            Set<K> keys = new HashSet<>();
+
+            final GridKernal grid = ((GridKernal) ignite);
+
+            final GridCache<K,V> cache = grid.cachex(cacheName);
+
+            final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context();
+
+            assert cache != null;
+
+            for (K k : cache.keySet(filter)) {
+                if (ctx.affinity().primary(ctx.localNode(), k, topVer))
+                    keys.add(k);
+                if (keys.size() >= rmvBatchSz) {
+                    cache.removeAll(keys);
+
+                    keys.clear();
+                }
+            }
+            cache.removeAll(keys);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
+            out.writeLong(topVer);
+            out.writeLong(rmvBatchSz);
+            out.writeObject(filter);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            cacheName = U.readString(in);
+            topVer = in.readLong();
+            rmvBatchSz = in.readLong();
+            filter = (IgnitePredicate<GridCacheEntry<K, V>>[]) in.readObject();
+        }
+    }
+
+    /**
      * Internal callable which performs {@link GridCacheProjection#size()} or {@link GridCacheProjection#primarySize()}
      * operation on a cache with the given name.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index a9896e5..ccb045b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -896,6 +896,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+        return cache.keySet(filter);
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         return cache.primaryKeySet(entryFilter(true));
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index c88183e..b0a4713 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -1002,6 +1002,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
         }
     }
 
+     /** {@inheritDoc} */
+    @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.keySet(filter);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
index fed75ec..6099928 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
@@ -30,6 +30,8 @@ import org.gridgain.testframework.junits.common.*;
  *
  */
 public class IgniteCacheTest extends GridCommonAbstractTest {
+    private static long ENTRY_COUNT = 1000;
+
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
@@ -105,6 +107,24 @@ public class IgniteCacheTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetRemoveAll() throws Exception {
+        IgniteCache<Integer, String> cache = jcache();
+
+        for (int i = 0; i < ENTRY_COUNT; i++)
+            cache.put(i, String.valueOf(i));
+
+        for (int i = 0; i < ENTRY_COUNT; i++)
+            assertEquals(String.valueOf(i), cache.get(i));
+
+        cache.removeAll();
+
+        for (int i = 0; i < ENTRY_COUNT; i++)
+            assertNull(cache.get(i));
+    }
+
+    /**
      * @return Cache.
      */
     protected <K, V> IgniteCache<K, V> jcache() {