You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/16 15:50:17 UTC
incubator-ignite git commit: IGNITE-54 Implemented removal of primary
entries on each node
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-54 [created] 3344e7271
IGNITE-54
Implemented removal of primary entries on each node
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3344e727
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3344e727
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3344e727
Branch: refs/heads/ignite-54
Commit: 3344e7271f20ec91a8b17d26a3d6455f054e6a3d
Parents: 6efcfb2
Author: avinogradov <av...@gridgain.com>
Authored: Fri Jan 16 17:49:46 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Fri Jan 16 17:49:46 2015 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 15 ++-
.../grid/cache/GridCacheProjection.java | 17 +++
.../processors/cache/GridCacheAdapter.java | 134 +++++++++++++++++--
.../cache/GridCacheProjectionImpl.java | 5 +
.../processors/cache/GridCacheProxyImpl.java | 12 ++
.../processors/cache/IgniteCacheTest.java | 20 +++
6 files changed, 189 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index ab45aea..2533d34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -458,8 +458,19 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public void removeAll() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+
+ try {
+ delegate.removeAll();
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
index d188b71..2a84fbe 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheProjection.java
@@ -1235,6 +1235,23 @@ public interface GridCacheProjection<K, V> extends Iterable<GridCacheEntry<K, V>
public Set<K> keySet();
/**
+ * Set of keys cached on this node. You can remove elements from this set, but you cannot add elements
+ * to this set. All removal operation will be reflected on the cache itself.
+ * <p>
+ * Iterator over this set will not fail if set was concurrently updated
+ * by another thread. This means that iterator may or may not return latest
+ * keys depending on whether they were added before or after current
+ * iterator position.
+ * <p>
+ * NOTE: this operation is not distributed and returns only the keys cached on this node.
+ *
+ * @param filter Optional filter to check prior to getting key form cache. Note
+ * that filter is checked atomically together with get operation.
+ * @return Key set for this cache projection.
+ */
+ public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) ;
+
+ /**
* Set of keys for which this node is primary.
* This set is dynamic and may change with grid topology changes.
* Note that this set will contain mappings for all keys, even if their values are
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 97d914a..ffd015f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** clearAll() split threshold. */
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
+ /** removeAll() batch size. */
+ private static final long REMOVE_ALL_BATCH_SIZE = 100L;
+
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
String>>() {
@@ -2994,22 +2997,38 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {
- ctx.denyOnLocalRead();
+ try {
+ if (F.isEmptyOrNulls(filter))
+ filter = ctx.trueArray();
- if (F.isEmptyOrNulls(filter))
- filter = ctx.trueArray();
+ long topVer;
- final IgnitePredicate<GridCacheEntry<K, V>>[] p = filter;
+ do {
+ topVer = ctx.affinity().affinityTopologyVersion();
- syncOp(new SyncInOp(false) {
- @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException {
- tx.removeAllAsync(ctx, keySet(p), null, false, null).get();
- }
+ // Send job to all nodes.
+ Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();
- @Override public String toString() {
- return "removeAll [filter=" + Arrays.toString(p) + ']';
- }
- });
+ IgniteFuture<Object> fut = null;
+
+ if (!nodes.isEmpty())
+ fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(), topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true);
+
+ if (fut != null)
+ fut.get();
+
+ } while (ctx.affinity().affinityTopologyVersion() > topVer);
+ }
+ catch (ClusterGroupEmptyException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]");
+ }
+ catch (ComputeTaskTimeoutException e) {
+ U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider increasing " +
+ "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+ throw e;
+ }
}
/** {@inheritDoc} */
@@ -4670,6 +4689,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/**
+ * Internal callable which performs remove all primary key mappings
+ * operation on a cache with the given name.
+ */
+ @GridInternal
+ private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** Topology version. */
+ private long topVer;
+
+ /** Remove batch size. */
+ private long rmvBatchSz;
+
+ /** Filters. */
+ private IgnitePredicate<GridCacheEntry<K, V>>[] filter;
+
+ /** Injected grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * Empty constructor for serialization.
+ */
+ public GlobalRemoveAllCallable() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @param rmvBatchSz Remove batch size.
+ * @param filter Filter.
+ */
+ private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<GridCacheEntry<K, V>> ... filter) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.rmvBatchSz = rmvBatchSz;
+ this.filter = filter;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public Object call() throws Exception {
+ Set<K> keys = new HashSet<>();
+
+ final GridKernal grid = ((GridKernal) ignite);
+
+ final GridCache<K,V> cache = grid.cachex(cacheName);
+
+ final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context();
+
+ assert cache != null;
+
+ for (K k : cache.keySet(filter)) {
+ if (ctx.affinity().primary(ctx.localNode(), k, topVer))
+ keys.add(k);
+ if (keys.size() >= rmvBatchSz) {
+ cache.removeAll(keys);
+
+ keys.clear();
+ }
+ }
+ cache.removeAll(keys);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, cacheName);
+ out.writeLong(topVer);
+ out.writeLong(rmvBatchSz);
+ out.writeObject(filter);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cacheName = U.readString(in);
+ topVer = in.readLong();
+ rmvBatchSz = in.readLong();
+ filter = (IgnitePredicate<GridCacheEntry<K, V>>[]) in.readObject();
+ }
+ }
+
+ /**
* Internal callable which performs {@link GridCacheProjection#size()} or {@link GridCacheProjection#primarySize()}
* operation on a cache with the given name.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index a9896e5..ccb045b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -896,6 +896,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
+ @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+ return cache.keySet(filter);
+ }
+
+ /** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
return cache.primaryKeySet(entryFilter(true));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index c88183e..b0a4713 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -1002,6 +1002,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
}
+ /** {@inheritDoc} */
+ @Override public Set<K> keySet(@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.keySet(filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3344e727/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
index fed75ec..6099928 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
@@ -30,6 +30,8 @@ import org.gridgain.testframework.junits.common.*;
*
*/
public class IgniteCacheTest extends GridCommonAbstractTest {
+ private static long ENTRY_COUNT = 1000;
+
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
@@ -105,6 +107,24 @@ public class IgniteCacheTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPutGetRemoveAll() throws Exception {
+ IgniteCache<Integer, String> cache = jcache();
+
+ for (int i = 0; i < ENTRY_COUNT; i++)
+ cache.put(i, String.valueOf(i));
+
+ for (int i = 0; i < ENTRY_COUNT; i++)
+ assertEquals(String.valueOf(i), cache.get(i));
+
+ cache.removeAll();
+
+ for (int i = 0; i < ENTRY_COUNT; i++)
+ assertNull(cache.get(i));
+ }
+
+ /**
* @return Cache.
*/
protected <K, V> IgniteCache<K, V> jcache() {