You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:44 UTC
[25/50] [abbrv] ignite git commit: IGNITE-2948 - Optimize usage of
GridCacheConcurrentMap
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index 05ff71a..e733114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -17,367 +17,14 @@
package org.apache.ignite.internal.processors.cache;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectStreamException;
-import java.lang.reflect.Array;
-import java.util.AbstractCollection;
-import java.util.AbstractSet;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.lang.GridTriple;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.LongAdder8;
/**
- * Concurrent implementation of cache map.
+ * Concurrent cache map.
*/
-public class GridCacheConcurrentMap {
- /** Debug flag. */
- private static final boolean DEBUG = false;
-
- /** Random. */
- private static final Random RAND = new Random();
-
- /** The default load factor for this map. */
- private static final float DFLT_LOAD_FACTOR = 0.75f;
-
- /** The default concurrency level for this map. */
- private static final int DFLT_CONCUR_LEVEL = Runtime.getRuntime().availableProcessors() * 2;
-
- /**
- * The maximum capacity, used if a higher value is implicitly specified by either
- * of the constructors with arguments. Must be a power of two <= 1<<30 to ensure
- * that entries are indexable using integers.
- */
- private static final int MAX_CAP = 1 << 30;
-
- /** The maximum number of segments to allow. */
- private static final int MAX_SEGS = 1 << 16; // slightly conservative
-
- /**
- * Mask value for indexing into segments. The upper bits of a
- * key's hash code are used to choose the segment.
- */
- private final int segMask;
-
- /** Shift value for indexing within segments. */
- private final int segShift;
-
- /** The segments, each of which is a specialized hash table. */
- private final Segment[] segs;
-
- /** */
- private GridCacheMapEntryFactory factory;
-
- /** Cache context. */
- protected final GridCacheContext ctx;
-
- /** */
- private final LongAdder8 mapPubSize = new LongAdder8();
-
- /** */
- private final LongAdder8 mapSize = new LongAdder8();
-
- /** Filters cache internal entry. */
- private static final CacheEntryPredicate NON_INTERNAL =
- new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() {
- @Override public boolean apply(GridCacheEntryEx entry) {
- return !entry.isInternal();
- }
- });
-
- /** Non-internal predicate array. */
- public static final CacheEntryPredicate[] NON_INTERNAL_ARR = new CacheEntryPredicate[] {NON_INTERNAL};
-
- /** Filters obsolete cache map entry. */
- private final IgnitePredicate<GridCacheMapEntry> obsolete =
- new P1<GridCacheMapEntry>() {
- @Override public boolean apply(GridCacheMapEntry entry) {
- return entry.obsolete();
- }
- };
-
- /**
- * Applies a supplemental hash function to a given hashCode, which
- * defends against poor quality hash functions. This is critical
- * because ConcurrentHashMap uses power-of-two length hash tables,
- * that otherwise encounter collisions for hashCodes that do not
- * differ in lower or upper bits.
- * <p>
- * This function has been taken from Java 8 ConcurrentHashMap with
- * slightly modifications.
- *
- * @param h Value to hash.
- * @return Hash value.
- */
- protected static int hash(int h) {
- return U.hash(h);
- }
-
- /**
- * Returns the segment that should be used for key with given hash
- *
- * @param hash The hash code for the key.
- * @return The segment.
- */
- private Segment segmentFor(int hash) {
- return segs[(hash >>> segShift) & segMask];
- }
-
- /**
- * Creates a new, empty map with the specified initial
- * capacity, load factor and concurrency level.
- *
- * @param ctx Cache context.
- * @param initCap the initial capacity. The implementation
- * performs internal sizing to accommodate this many elements.
- * @param factory Entry factory.
- * @param loadFactor the load factor threshold, used to control resizing.
- * Resizing may be performed when the average number of elements per
- * bin exceeds this threshold.
- * @param concurrencyLevel the estimated number of concurrently
- * updating threads. The implementation performs internal sizing
- * to try to accommodate this many threads.
- * @throws IllegalArgumentException if the initial capacity is
- * negative or the load factor or concurrencyLevel are
- * non-positive.
- */
- @SuppressWarnings({"unchecked"})
- protected GridCacheConcurrentMap(
- GridCacheContext ctx,
- int initCap,
- GridCacheMapEntryFactory factory,
- float loadFactor,
- int concurrencyLevel
- ) {
- this.ctx = ctx;
-
- if (!(loadFactor > 0) || initCap < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
-
- if (concurrencyLevel > MAX_SEGS)
- concurrencyLevel = MAX_SEGS;
-
- // Find power-of-two sizes best matching arguments
- int sshift = 0;
- int ssize = 1;
-
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
-
- segShift = 32 - sshift;
- segMask = ssize - 1;
- segs = (Segment[])Array.newInstance(Segment.class, ssize);
-
- if (initCap > MAX_CAP)
- initCap = MAX_CAP;
-
- int c = initCap / ssize;
-
- if (c * ssize < initCap)
- ++c;
-
- int cap = 1;
-
- while (cap < c)
- cap <<= 1;
-
- if (cap < 16)
- cap = 16;
-
- for (int i = 0; i < segs.length; ++i)
- segs[i] = new Segment(cap, loadFactor);
-
- this.factory = factory;
- }
-
- /**
- * Creates a new, empty map with the specified initial capacity
- * and load factor and with the default concurrencyLevel (16).
- *
- * @param ctx Cache context.
- * @param initCap The implementation performs internal
- * sizing to accommodate this many elements.
- * @param factory Entries factory.
- * @throws IllegalArgumentException if the initial capacity of
- * elements is negative or the load factor is non-positive.
- */
- public GridCacheConcurrentMap(
- GridCacheContext ctx,
- int initCap,
- @Nullable GridCacheMapEntryFactory factory
- ) {
- this(ctx, initCap, factory, DFLT_LOAD_FACTOR, DFLT_CONCUR_LEVEL);
- }
-
- /**
- * Sets factory for entries.
- *
- * @param factory Entry factory.
- */
- public void setEntryFactory(GridCacheMapEntryFactory factory) {
- assert factory != null;
-
- this.factory = factory;
- }
-
- /**
- * @return Entries factory.
- */
- public GridCacheMapEntryFactory getEntryFactory() {
- return factory;
- }
-
- /**
- * @return Non-internal predicate.
- */
- private static CacheEntryPredicate[] nonInternal() {
- return NON_INTERNAL_ARR;
- }
-
- /**
- * @param filter Filter to add to non-internal-key filter.
- * @return Non-internal predicate.
- */
- private static CacheEntryPredicate[] nonInternal(
- CacheEntryPredicate[] filter) {
- return F.asArray(F0.and0(NON_INTERNAL_ARR, filter));
- }
-
- /**
- * @return {@code True} if this map is empty.
- */
- public boolean isEmpty() {
- return mapSize.sum() == 0;
- }
-
- /**
- * Returns the number of key-value mappings in this map.
- *
- * @return the number of key-value mappings in this map.
- */
- public int size() {
- return mapSize.intValue();
- }
-
- /**
- * @return Public size.
- */
- public int publicSize() {
- return mapPubSize.intValue();
- }
-
- /**
- * @param e Cache map entry.
- */
- public void decrementSize(GridCacheMapEntry e) {
- assert !e.isInternal();
- assert Thread.holdsLock(e);
- assert e.deletedUnlocked();
- assert ctx.deferredDelete();
-
- mapPubSize.decrement();
-
- segmentFor(e.hash()).decrementPublicSize();
- }
-
- /**
- * @param e Cache map entry.
- */
- public void incrementSize(GridCacheMapEntry e) {
- assert !e.isInternal();
- assert Thread.holdsLock(e);
- assert !e.deletedUnlocked();
- assert ctx.deferredDelete();
-
- mapPubSize.increment();
-
- segmentFor(e.hash()).incrementPublicSize();
- }
-
- /**
- * @param key Key.
- * @return {@code True} if map contains mapping for provided key.
- */
- public boolean containsKey(Object key) {
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).containsKey(key, hash);
- }
-
- /**
- * Collection of all (possibly {@code null}) values.
- *
- * @param filter Filter.
- * @return a collection view of the values contained in this map.
- */
- public <K, V> Collection<V> allValues(CacheEntryPredicate[] filter) {
- return new Values<>(this, filter);
- }
-
- /**
- * @return Random entry out of hash map.
- */
- @Nullable public GridCacheMapEntry randomEntry() {
- while (true) {
- if (mapPubSize.sum() == 0)
- return null;
-
- // Desired and current indexes.
- int segIdx = RAND.nextInt(segs.length);
-
- Segment seg = null;
-
- for (int i = segIdx; i < segs.length + segIdx; i++) {
- Segment s = segs[i % segs.length];
-
- if (s.publicSize() > 0)
- seg = s;
- }
-
- if (seg == null)
- // It happened so that all public values had been removed from segments.
- return null;
-
- GridCacheMapEntry entry = seg.randomEntry();
-
- if (entry == null)
- continue;
-
- assert !(entry.key() instanceof GridCacheInternal);
-
- return entry;
- }
- }
-
+public interface GridCacheConcurrentMap {
/**
* Returns the entry associated with the specified key in the
* HashMap. Returns null if the HashMap contains no mapping
@@ -386,27 +33,7 @@ public class GridCacheConcurrentMap {
* @param key Key.
* @return Entry.
*/
- @Nullable public GridCacheMapEntry getEntry(Object key) {
- assert key != null;
-
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).get(key, hash);
- }
-
- /**
- * @param topVer Topology version.
- * @param key Key.
- * @param val Value.
- * @return Cache entry for corresponding key-value pair.
- */
- public GridCacheMapEntry putEntry(AffinityTopologyVersion topVer, KeyCacheObject key, @Nullable CacheObject val) {
- assert key != null;
-
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).put(key, hash, val, topVer);
- }
+ @Nullable public GridCacheMapEntry getEntry(KeyCacheObject key);
/**
* @param topVer Topology version.
@@ -416,1633 +43,72 @@ public class GridCacheConcurrentMap {
* @return Triple where the first element is current entry associated with the key,
* the second is created entry and the third is doomed (all may be null).
*/
- public GridTriple<GridCacheMapEntry> putEntryIfObsoleteOrAbsent(
+ @Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
AffinityTopologyVersion topVer,
KeyCacheObject key,
@Nullable CacheObject val,
- boolean create)
- {
- assert key != null;
-
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).putIfObsolete(key, hash, val, topVer, create);
- }
-
- /**
- * Copies all of the mappings from the specified map to this map
- * These mappings will replace any mappings that
- * this map had for any of the keys currently in the specified map.
- *
- * @param m mappings to be stored in this map.
- * @throws NullPointerException If the specified map is null.
- */
- public void putAll(Map<KeyCacheObject, CacheObject> m) {
- for (Map.Entry<KeyCacheObject, CacheObject> e : m.entrySet())
- putEntry(AffinityTopologyVersion.NONE, e.getKey(), e.getValue());
- }
+ boolean create,
+ boolean touch);
/**
* Removes passed in entry if it presents in the map.
*
- * @param e Entry to remove.
+ * @param entry Entry to remove.
* @return {@code True} if remove happened.
*/
- public boolean removeEntry(GridCacheEntryEx e) {
- assert e != null;
-
- KeyCacheObject key = e.key();
-
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).remove(key, hash, same(e)) != null;
- }
+ public boolean removeEntry(GridCacheEntryEx entry);
/**
- * @param p Entry to check equality.
- * @return Predicate to filter the same (equal by ==) entry.
- */
- private IgnitePredicate<GridCacheMapEntry> same(final GridCacheEntryEx p) {
- return new P1<GridCacheMapEntry>() {
- @Override public boolean apply(GridCacheMapEntry e) {
- return e == p;
- }
- };
- }
-
- /**
- * Removes and returns the entry associated with the specified key
- * in the HashMap if entry is obsolete. Returns null if the HashMap
- * contains no mapping for this key.
+ * Returns the number of key-value mappings in this map.
*
- * @param key Key.
- * @return Removed entry, possibly {@code null}.
- */
- @SuppressWarnings( {"unchecked"})
- @Nullable public GridCacheMapEntry removeEntryIfObsolete(KeyCacheObject key) {
- assert key != null;
-
- int hash = hash(key.hashCode());
-
- return segmentFor(hash).remove(key, hash, obsolete);
- }
-
- /**
- * @param filter Filter.
- * @return Set of the mappings contained in this map.
+ * @return the number of key-value mappings in this map.
*/
- @SuppressWarnings({"unchecked"})
- public <K, V> Set<Cache.Entry<K, V>> entries(CacheEntryPredicate... filter) {
- return new EntrySet<>(this, filter);
- }
+ public int size();
/**
- * Returns entry set containing internal entries.
+ * Returns the number of publicly available key-value mappings in this map.
+ * It excludes entries that are marked as deleted.
*
- * @param filter Filter.
- * @return Set of the mappings contained in this map.
+ * @return the number of publicly available key-value mappings in this map.
*/
- @SuppressWarnings({"unchecked"})
- public <K, V> Set<Cache.Entry<K, V>> entriesx(CacheEntryPredicate... filter) {
- return new EntrySet<>(this, filter, true);
- }
+ public int publicSize();
/**
- * Internal entry set, excluding {@link GridCacheInternal} entries.
+ * Increments public size.
*
- * @return Set of the mappings contained in this map.
+ * @param e Entry that caused public size change.
*/
- public Set<GridCacheEntryEx> entries0() {
- return new Set0<>(this, GridCacheConcurrentMap.nonInternal());
- }
+ public void incrementPublicSize(GridCacheEntryEx e);
/**
- * Get striped entry iterator.
+ * Decrements public size.
*
- * @param id Expected modulo.
- * @param totalCnt Maximum modulo.
- * @return Striped entry iterator.
+ * @param e Entry that caused public size change.
*/
- public Iterator<GridCacheEntryEx> stripedEntryIterator(int id, int totalCnt) {
- return new Iterator0<>(this, false, GridCacheConcurrentMap.nonInternal(), id, totalCnt);
- }
+ public void decrementPublicSize(GridCacheEntryEx e);
- /**
- * Gets all internal entry set, including {@link GridCacheInternal} entries.
- *
- * @return All internal entry set, including {@link GridCacheInternal} entries.
- */
- public Set<GridCacheEntryEx> allEntries0() {
- return new Set0<>(this, CU.empty0());
- }
+ @Nullable public GridCacheMapEntry randomEntry();
/**
- * Key set.
- *
- * @param filter Filter.
- * @return Set of the keys contained in this map.
+ * @return Random entry out of hash map.
*/
- public <K, V> Set<K> keySet(CacheEntryPredicate... filter) {
- return new KeySet<>(this, filter, false);
- }
+ public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter);
/**
- * Key set including internal keys.
- *
* @param filter Filter.
- * @return Set of the keys contained in this map.
+ * @return Iterable of the mappings contained in this map, excluding entries in unvisitable state.
*/
- public <K, V> Set<K> keySetx(CacheEntryPredicate... filter) {
- return new KeySet<>(this, filter, true);
- }
+ public Iterable<GridCacheMapEntry> entries(CacheEntryPredicate... filter);
/**
- * Collection of non-{@code null} values.
- *
* @param filter Filter.
- * @return Collection view of the values contained in this map.
- */
- public <K, V> Collection<V> values(CacheEntryPredicate... filter) {
- return allValues(filter);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheConcurrentMap.class, this, "size", mapSize, "pubSize", mapPubSize);
- }
-
- /**
- *
- */
- @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
- void printDebugInfo() {
- for (Segment s : segs)
- s.lock();
-
- try {
- X.println(">>> Cache map debug info: " + ctx.namexx());
-
- for (int i = 0; i < segs.length; i++) {
- Segment seg = segs[i];
-
- X.println(" Segment [idx=" + i + ", size=" + seg.size() + ']');
-
- HashEntry[] tab = seg.tbl;
-
- for (int j = 0; j < tab.length; j++)
- X.println(" Bucket [idx=" + j + ", bucket=" + tab[j] + ']');
- }
-
- checkConsistency();
- }
- finally {
- for (Segment s : segs)
- s.unlock();
- }
- }
-
- /**
- *
- */
- public void checkConsistency() {
- int size = 0;
- int pubSize = 0;
-
- IgniteLogger log = ctx.logger(GridCacheConcurrentMap.class);
-
- for (Segment s : segs) {
- assert s.isHeldByCurrentThread();
-
- HashEntry[] tab = s.tbl;
-
- for (HashEntry b : tab) {
- if (b != null) {
- HashEntry e = b;
-
- assert e != null;
-
- int cnt = 0;
- int pubCnt = 0;
-
- while (e != null) {
- cnt++;
-
- log.info("Cache map entry: " + e);
-
- if (!e.mapEntry.deleted()) {
- if (!(e.mapEntry.key instanceof GridCacheInternal))
- pubCnt++;
- }
-
- e = e.next;
- }
-
- size += cnt;
- pubSize += pubCnt;
- }
- }
- }
-
- assert size() == size : "Invalid size [expected=" + size() + ", actual=" + size + ']';
- assert publicSize() == pubSize : "Invalid public size [expected=" + publicSize() + ", actual=" + pubSize + ']';
- }
-
- /**
- * Segments are specialized versions of hash tables. This
- * subclasses from ReentrantLock opportunistically,
- * just to simplify some locking and avoid separate construction.
- */
- private class Segment extends ReentrantLock {
- /** */
- private static final long serialVersionUID = 0L;
-
- /*
- * Segments maintain a table of entry lists that are ALWAYS
- * kept in a consistent state, so can be read without locking.
- * Next fields of nodes are immutable (final). All list
- * additions are performed at the front of each bin. This
- * makes it easy to check changes, and also fast to traverse.
- * When nodes would otherwise be changed, new nodes are
- * created to replace them. This works well for hash tables
- * since the bin lists tend to be short. (The average length
- * is less than two for the default load factor threshold.)
- *
- * Read operations can thus proceed without locking, but rely
- * on selected uses of volatiles to ensure that completed
- * write operations performed by other threads are
- * noticed. For most purposes, the "count" field, tracking the
- * number of elements, serves as that volatile variable
- * ensuring visibility. This is convenient because this field
- * needs to be read in many read operations anyway:
- *
- * - All (unsynchronized) read operations must first read the
- * "count" field, and should not look at table entries if
- * it is 0.
- *
- * - All (synchronized) write operations should write to
- * the "count" field after structurally changing any bin.
- * The operations must not take any action that could even
- * momentarily cause a concurrent read operation to see
- * inconsistent data. This is made easier by the nature of
- * the read operations in Map. For example, no operation
- * can reveal that the table has grown but the threshold
- * has not yet been updated, so there are no atomicity
- * requirements for this with respect to reads.
- *
- * As a guide, all critical volatile reads and writes to the
- * count field are marked in code comments.
- */
-
- /**
- * The table is rehashed when its size exceeds this threshold.
- * (The value of this field is always <tt>(int)(capacity * loadFactor)</tt>.)
- */
- private int threshold;
-
- /** The number of public elements in this segment's region. */
- private final LongAdder8 pubSize = new LongAdder8();
-
- /**
- * The load factor for the hash table. Even though this value
- * is same for all segments, it is replicated to avoid needing
- * links to outer object.
- * @serial
- */
- private final float loadFactor;
-
- /** Entry table. */
- private volatile HashEntry[] tbl;
-
- /** The number of elements in this segment's region. */
- private volatile int size;
-
- /**
- * @param initCap Initial capacity.
- * @param lf Load factor.
- */
- @SuppressWarnings("unchecked")
- Segment(int initCap, float lf) {
- loadFactor = lf;
-
- tbl = new HashEntry[initCap];
-
- threshold = (int)(initCap * loadFactor);
- }
-
- /**
- * Returns properly casted first entry for given hash.
- *
- * @param hash Hash.
- * @return Entry for hash.
- */
- @Nullable HashEntry getFirst(HashEntry[] tbl, int hash) {
- return tbl[hash & (tbl.length - 1)];
- }
-
- /**
- * @param key Key.
- * @param hash Hash.
- * @return Value.
- */
- @Nullable GridCacheMapEntry get(Object key, int hash) {
- if (size != 0) {
- HashEntry e = getFirst(tbl, hash);
-
- while (e != null) {
- if (e.mapEntry.hash() == hash && key.equals(e.mapEntry.key()))
- return e.mapEntry;
-
- e = e.next;
- }
- }
-
- return null;
- }
-
- /**
- * @param key Key.
- * @param hash Hash.
- * @return {@code True} if segment contains value.
- */
- boolean containsKey(Object key, int hash) {
- if (size != 0) {
- HashEntry e = getFirst(tbl, hash);
-
- while (e != null) {
- if (e.mapEntry.hash() == hash && key.equals(e.mapEntry.key()))
- return true;
-
- e = e.next;
- }
- }
-
- return false;
- }
-
- /**
- * @param key Key.
- * @param hash Hash.
- * @param val Value.
- * @param topVer Topology version.
- * @return Associated value.
- */
- @SuppressWarnings({"unchecked"})
- GridCacheMapEntry put(KeyCacheObject key, int hash, @Nullable CacheObject val, AffinityTopologyVersion topVer) {
- lock();
-
- try {
- return put0(key, hash, val, topVer);
- }
- finally {
- unlock();
- }
- }
-
- /**
- * @param key Key.
- * @param hash Hash.
- * @param val Value.
- * @param topVer Topology version.
- * @return Associated value.
- */
- @SuppressWarnings({"unchecked", "SynchronizationOnLocalVariableOrMethodParameter"})
- private GridCacheMapEntry put0(KeyCacheObject key, int hash, CacheObject val, AffinityTopologyVersion topVer) {
- try {
- int c = size;
-
- if (c++ > threshold) // Ensure capacity.
- rehash();
-
- HashEntry[] tab = tbl;
-
- int idx = hash & (tab.length - 1);
-
- HashEntry bin = tab[idx];
-
- HashEntry e = bin;
-
- while (e != null && (e.mapEntry.hash() != hash || !key.equals(e.mapEntry.key())))
- e = e.next;
-
- GridCacheMapEntry retVal;
-
- if (e != null) {
- retVal = e.mapEntry;
-
- retVal.rawPut(val, 0);
- }
- else {
- HashEntry next = bin != null ? bin : null;
-
- GridCacheMapEntry newEntry = factory.create(ctx, topVer, key, hash, val);
-
- tab[idx] = new HashEntry(newEntry, next);
-
- retVal = newEntry;
-
- // Modify counters.
- if (!retVal.isInternal()) {
- mapPubSize.increment();
-
- pubSize.increment();
- }
-
- mapSize.increment();
-
- size = c;
- }
-
- return retVal;
- }
- finally {
- if (DEBUG)
- checkSegmentConsistency();
- }
- }
-
- /**
- * @param key Key.
- * @param hash Hash.
- * @param val Value.
- * @param topVer Topology version.
- * @param create Create flag.
- * @return Triple where the first element is current entry associated with the key,
- * the second is created entry and the third is doomed (all may be null).
- */
- @SuppressWarnings( {"unchecked"})
- GridTriple<GridCacheMapEntry> putIfObsolete(KeyCacheObject key,
- int hash,
- @Nullable CacheObject val,
- AffinityTopologyVersion topVer,
- boolean create
- ) {
- lock();
-
- try {
- HashEntry[] tab = tbl;
-
- int idx = hash & (tab.length - 1);
-
- HashEntry bin = tab[idx];
-
- GridCacheMapEntry cur = null;
- GridCacheMapEntry created = null;
- GridCacheMapEntry doomed = null;
-
- if (bin == null) {
- if (create)
- cur = created = put0(key, hash, val, topVer);
-
- return new GridTriple<>(cur, created, doomed);
- }
-
- HashEntry e = bin;
-
- while (e != null && (e.mapEntry.hash() != hash || !key.equals(e.mapEntry.key())))
- e = e.next;
-
- if (e != null) {
- if (e.mapEntry.obsolete()) {
- doomed = remove(key, hash, null);
-
- if (create)
- cur = created = put0(key, hash, val, topVer);
- }
- else
- cur = e.mapEntry;
- }
- else if (create)
- cur = created = put0(key, hash, val, topVer);
-
- return new GridTriple<>(cur, created, doomed);
- }
- finally {
- unlock();
- }
- }
-
- /**
- *
- */
- @SuppressWarnings("unchecked")
- void rehash() {
- HashEntry[] oldTbl = tbl;
-
- int oldCap = oldTbl.length;
-
- if (oldCap >= MAX_CAP)
- return;
-
- /*
- * Reclassify nodes in each list to new Map. Because we are
- * using power-of-two expansion, the elements from each bin
- * must either stay at same index, or move with a power of two
- * offset. We eliminate unnecessary node creation by catching
- * cases where old nodes can be reused because their next
- * fields won't change. Statistically, at the default
- * threshold, only about one-sixth of them need cloning when
- * a table doubles. The nodes they replace will be eligible for GC
- * as soon as they are no longer referenced by any
- * reader thread that may be in the midst of traversing table
- * right now.
- */
- HashEntry[] newTbl = new HashEntry[oldCap << 1];
-
- threshold = (int)(newTbl.length * loadFactor);
-
- int sizeMask = newTbl.length - 1;
-
- for (int i = 0; i < oldCap ; i++) {
- HashEntry e = oldTbl[i];
-
- if (e != null) {
- HashEntry next = e.next;
-
- int idx = e.mapEntry.hash() & sizeMask;
-
- if (next == null) // Single node on list
- newTbl[idx] = e;
- else { // Reuse consecutive sequence at same slot
- HashEntry lastRun = e;
-
- int lastIdx = idx;
-
- for (HashEntry last = next; last != null; last = last.next) {
- int k = last.mapEntry.hash() & sizeMask;
-
- if (k != lastIdx) {
- lastIdx = k;
- lastRun = last;
- }
- }
-
- newTbl[lastIdx] = lastRun;
-
- // Clone remaining nodes
- for (HashEntry p = e; p != lastRun; p = p.next) {
- int k = p.mapEntry.hash() & sizeMask;
-
- HashEntry n = newTbl[k];
-
- newTbl[k] = new HashEntry(p.mapEntry, n);
- }
- }
- }
- }
-
- tbl = newTbl;
-
- if (DEBUG)
- checkSegmentConsistency();
- }
-
- /**
- * Remove; match on key only if value null, else match both.
- *
- * @param key Key.
- * @param hash Hash.
- * @param filter Optional predicate.
- * @return Removed value.
- */
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Nullable GridCacheMapEntry remove(Object key, int hash,
- @Nullable IgnitePredicate<GridCacheMapEntry> filter) {
- lock();
-
- try {
- int idx = hash & (tbl.length - 1);
-
- HashEntry bin = tbl[idx];
-
- if (bin == null)
- return null;
-
- HashEntry prev = null;
- HashEntry e = bin;
-
- while (e != null && (e.mapEntry.hash() != hash || !key.equals(e.mapEntry.key))) {
- prev = e;
-
- e = e.next;
- }
-
- if (e != null) {
- if (filter != null && !filter.apply(e.mapEntry))
- return null;
-
- if (prev == null)
- tbl[idx] = e.next;
- else
- prev.next = e.next;
-
- // Modify counters.
- synchronized (e) {
- if (!e.mapEntry.isInternal() && !e.mapEntry.deleted()) {
- mapPubSize.decrement();
-
- pubSize.decrement();
- }
- }
-
- mapSize.decrement();
-
- --size;
-
- return e.mapEntry;
- }
-
- return null;
- }
- finally {
- if (DEBUG)
- checkSegmentConsistency();
-
- unlock();
- }
- }
-
- /**
- * @return Entries count within segment.
- */
- int size() {
- return size;
- }
-
- /**
- * @return Public entries count within segment.
- */
- int publicSize() {
- return pubSize.intValue();
- }
-
- /**
- * Decrements segment public size.
- */
- void decrementPublicSize() {
- pubSize.decrement();
- }
-
- /**
- * Decrements segment public size.
- */
- void incrementPublicSize() {
- pubSize.increment();
- }
-
- /**
- * @return Random cache map entry from this segment.
- */
- @Nullable GridCacheMapEntry randomEntry() {
- if (size == 0)
- return null;
-
- HashEntry[] tbl = this.tbl;
-
- Collection<GridCacheMapEntry> entries = new ArrayList<>(3);
-
- int pubCnt = 0;
-
- int start = RAND.nextInt(tbl.length);
-
- outerLoop:
- {
- for (int i = start; i < start + tbl.length; i++) {
- HashEntry first = tbl[i & (tbl.length - 1)];
-
- if (first == null)
- continue;
-
- for (HashEntry e = first; e != null; e = e.next) {
- if (!e.mapEntry.isInternal())
- pubCnt++;
-
- entries.add(e.mapEntry);
-
- if (entries.size() == 3)
- break outerLoop;
-
- }
- }
- }
-
- if (entries.isEmpty())
- return null;
-
- if (pubCnt == 0)
- return null;
-
- // Desired and current indexes.
- int idx = RAND.nextInt(pubCnt);
-
- int i = 0;
-
- GridCacheMapEntry retVal = null;
-
- for (GridCacheMapEntry e : entries) {
- if (!(e.key instanceof GridCacheInternal)) {
- // In case desired entry was deleted, we return the closest one from left.
- retVal = e;
-
- if (idx == i++)
- break;
- }
- }
-
- return retVal;
- }
-
- /**
- *
- */
- void checkSegmentConsistency() {
- HashEntry[] tbl = this.tbl;
-
- int cnt = 0;
- int pubCnt = 0;
-
- for (HashEntry b : tbl) {
- if (b != null) {
- HashEntry e = b;
-
- assert e != null;
-
- while (e != null) {
- cnt++;
-
- if (!(e.mapEntry.key instanceof GridCacheInternal))
- pubCnt++;
-
- e = e.next;
- }
- }
- }
-
- assert cnt == size : "Entry count and header size mismatch [cnt=" + cnt + ", hdrSize=" +
- size + ", segment=" + this + ']';
- assert pubCnt == pubSize.intValue();
- }
- }
-
- /**
- * Iterator over {@link GridCacheEntryEx} elements.
- *
- * @param <K> Key type.
- * @param <V> Value type.
+ * @return Iterable of the mappings contained in this map, including entries in unvisitable state.
*/
- private static class Iterator0<K, V> implements Iterator<GridCacheEntryEx>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int nextSegIdx;
-
- /** */
- private int nextTblIdx;
-
- /** */
- private HashEntry[] curTbl;
-
- /** */
- private HashEntry nextEntry;
-
- /** Next entry to return. */
- private HashEntry next;
-
- /** Next value. */
- private V nextVal;
-
- /** Current value. */
- private V curVal;
-
- /** */
- private boolean isVal;
-
- /** Current entry. */
- private HashEntry cur;
-
- /** Iterator filter. */
- private CacheEntryPredicate[] filter;
-
- /** Outer cache map. */
- private GridCacheConcurrentMap map;
-
- /** Cache context. */
- private GridCacheContext<K, V> ctx;
-
- /** Mod. */
- private int id;
-
- /** Mod count. */
- private int totalCnt;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public Iterator0() {
- // No-op.
- }
-
- /**
- * @param map Cache map.
- * @param isVal {@code True} if value iterator.
- * @param filter Entry filter.
- * @param id ID of the iterator.
- * @param totalCnt Total count of iterators.
- */
- @SuppressWarnings({"unchecked"})
- Iterator0(GridCacheConcurrentMap map, boolean isVal,
- CacheEntryPredicate[] filter, int id, int totalCnt) {
- this.filter = filter;
- this.isVal = isVal;
- this.id = id;
- this.totalCnt = totalCnt;
-
- this.map = map;
-
- ctx = map.ctx;
-
- nextSegIdx = map.segs.length - 1;
- nextTblIdx = -1;
-
- advance();
- }
-
- /**
- *
- */
- @SuppressWarnings({"unchecked"})
- private void advance() {
- if (nextEntry != null && advanceInBucket(nextEntry, true))
- return;
-
- while (nextTblIdx >= 0) {
- HashEntry bucket = curTbl[nextTblIdx--];
-
- if (bucket != null && advanceInBucket(bucket, false))
- return;
- }
-
- while (nextSegIdx >= 0) {
- int nextSegIdx0 = nextSegIdx--;
-
- GridCacheConcurrentMap.Segment seg = map.segs[nextSegIdx0];
-
- if (seg.size != 0 && (id == -1 || nextSegIdx0 % totalCnt == id)) {
- curTbl = seg.tbl;
-
- for (int j = curTbl.length - 1; j >= 0; --j) {
- HashEntry bucket = curTbl[j];
-
- if (bucket != null && advanceInBucket(bucket, false)) {
- nextTblIdx = j - 1;
-
- return;
- }
- }
- }
- }
- }
-
- /**
- * @param e Current next.
- * @param skipFirst {@code True} to skip check on first iteration.
- * @return {@code True} if advance succeeded.
- */
- @SuppressWarnings( {"unchecked"})
- private boolean advanceInBucket(@Nullable HashEntry e, boolean skipFirst) {
- if (e == null)
- return false;
-
- nextEntry = e;
-
- do {
- if (!skipFirst) {
- next = nextEntry;
-
- // Check if entry is visitable first before doing projection-aware peek.
- if (!next.mapEntry.visitable(filter))
- continue;
-
- if (isVal) {
- nextVal = next.mapEntry.<K, V>wrap().getValue();
-
- if (nextVal == null)
- continue;
- }
-
- return true;
- }
-
- // Perform checks in any case.
- skipFirst = false;
- }
- while ((nextEntry = nextEntry.next) != null);
-
- next = null;
- nextVal = null;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return next != null && (!isVal || nextVal != null);
- }
-
- /**
- * @return Next value.
- */
- public V currentValue() {
- return curVal;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public GridCacheEntryEx next() {
- HashEntry e = next;
- V v = nextVal;
-
- if (e == null)
- throw new NoSuchElementException();
-
- advance();
-
- cur = e;
- curVal = v;
-
- return cur.mapEntry;
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- if (cur == null)
- throw new IllegalStateException();
-
- HashEntry e = cur;
-
- cur = null;
- curVal = null;
-
- try {
- ((IgniteKernal)ctx.grid()).getCache(ctx.name()).getAndRemove(e.mapEntry.key);
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException(ex);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx);
- out.writeObject(filter);
- out.writeBoolean(isVal);
- out.writeInt(id);
- out.writeInt(totalCnt);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- ctx = (GridCacheContext<K, V>)in.readObject();
- filter = (CacheEntryPredicate[])in.readObject();
- isVal = in.readBoolean();
- id = in.readInt();
- totalCnt = in.readInt();
- }
-
- /**
- * Reconstructs object on unmarshalling.
- *
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
- */
- protected Object readResolve() throws ObjectStreamException {
- return new Iterator0<>(ctx.cache().map(), isVal, filter, id, totalCnt);
- }
- }
+ public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter);
/**
- * Entry set.
- */
- @SuppressWarnings("unchecked")
- private static class Set0<K, V> extends AbstractSet<GridCacheEntryEx> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Filter. */
- private CacheEntryPredicate[] filter;
-
- /** Base map. */
- private GridCacheConcurrentMap map;
-
- /** Context. */
- private GridCacheContext<K, V> ctx;
-
- /** */
- private CacheOperationContext opCtxPerCall;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public Set0() {
- // No-op.
- }
-
- /**
- * @param map Base map.
- * @param filter Filter.
- */
- private Set0(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) {
- assert map != null;
-
- this.map = map;
- this.filter = filter;
-
- ctx = map.ctx;
-
- opCtxPerCall = ctx.operationContextPerCall();
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<GridCacheEntryEx> iterator() {
- return new Iterator0<>(map, false, filter, -1, -1);
- }
-
- /**
- * @return Entry iterator.
- */
- Iterator<Cache.Entry<K, V>> entryIterator() {
- return new EntryIterator<>(map, filter, ctx, opCtxPerCall);
- }
-
- /**
- * @return Key iterator.
- */
- Iterator<K> keyIterator() {
- return new KeyIterator<>(map, opCtxPerCall != null && opCtxPerCall.isKeepBinary(), filter);
- }
-
- /**
- * @return Value iterator.
- */
- Iterator<V> valueIterator() {
- return new ValueIterator<>(map, filter, ctx);
- }
-
- /**
- * Checks for key containment.
- *
- * @param k Key to check.
- * @return {@code True} if key is in the map.
- */
- boolean containsKey(K k) {
- KeyCacheObject cacheKey = ctx.toCacheKeyObject(k);
-
- GridCacheEntryEx e = ctx.cache().peekEx(cacheKey);
-
- try {
- return e != null && !e.obsolete() &&
- (!e.deleted() || e.lockedByThread()) &&
- F.isAll(e, filter);
- }
- catch (GridCacheEntryRemovedException ignore) {
- return false;
- }
- }
-
- /**
- * @param v Checks if value is contained in
- * @return {@code True} if value is in the set.
- */
- boolean containsValue(V v) {
- A.notNull(v, "value");
-
- for (Iterator<V> it = valueIterator(); it.hasNext(); ) {
- V v0 = it.next();
-
- if (F.eq(v0, v))
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean contains(Object o) {
- if (!(o instanceof GridCacheEntryEx))
- return false;
-
- GridCacheEntryEx e = (GridCacheEntryEx)o;
-
- GridCacheEntryEx cur = ctx.cache().peekEx(e.key());
-
- return cur != null && cur.equals(e);
- }
-
- /** {@inheritDoc} */
- @Override public boolean remove(Object o) {
- return o instanceof Cache.Entry && removeKey(((Map.Entry<K, V>)o).getKey());
- }
-
- /**
- * @param k Key to remove.
- * @return If key has been removed.
- */
- boolean removeKey(K k) {
- try {
- return ((IgniteKernal)ctx.grid()).getCache(ctx.name()).remove(k, CU.<K, V>empty());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to remove cache entry for key: " + k, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return F.isEmpty(filter) ? map.publicSize() : F.size(iterator());
- }
-
- /** {@inheritDoc} */
- @Override public boolean isEmpty() {
- return F.isEmpty(filter) ? map.publicSize() == 0 : !iterator().hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
- ctx.cache().clearLocallyAll(new KeySet<K, V>(map, filter, false), true, true, false);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx);
- out.writeObject(filter);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- ctx = (GridCacheContext<K, V>)in.readObject();
- filter = (CacheEntryPredicate[])in.readObject();
- }
-
- /**
- * Reconstructs object on unmarshalling.
- *
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
- */
- protected Object readResolve() throws ObjectStreamException {
- return new Set0<>(ctx.cache().map(), filter);
- }
- }
-
- /**
- * Iterator over hash table.
- * <p>
- * Note, class is static for {@link Externalizable}.
- */
- private static class EntryIterator<K, V> implements Iterator<Cache.Entry<K, V>>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Base iterator. */
- private Iterator0<K, V> it;
-
- /** */
- private GridCacheContext<K, V> ctx;
-
- /** */
- private CacheOperationContext opCtxPerCall;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public EntryIterator() {
- // No-op.
- }
-
- /**
- * @param map Cache map.
- * @param filter Entry filter.
- * @param ctx Cache context.
- * @param opCtxPerCall Operation context per call.
- */
- EntryIterator(
- GridCacheConcurrentMap map,
- CacheEntryPredicate[] filter,
- GridCacheContext<K, V> ctx,
- CacheOperationContext opCtxPerCall) {
- it = new Iterator0<>(map, false, filter, -1, -1);
-
- this.ctx = ctx;
- this.opCtxPerCall = opCtxPerCall;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public Cache.Entry<K, V> next() {
- CacheOperationContext old = ctx.operationContextPerCall();
-
- ctx.operationContextPerCall(opCtxPerCall);
-
- try {
- return it.next().wrapLazyValue();
- }
- finally {
- ctx.operationContextPerCall(old);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- it.remove();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(it);
- out.writeObject(ctx);
- out.writeObject(opCtxPerCall);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- it = (Iterator0<K, V>)in.readObject();
- ctx = (GridCacheContext<K, V>)in.readObject();
- opCtxPerCall = (CacheOperationContext)in.readObject();
- }
- }
-
- /**
- * Value iterator.
- * <p>
- * Note that class is static for {@link Externalizable}.
- */
- private static class ValueIterator<K, V> implements Iterator<V>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Hash table iterator. */
- private Iterator0<K, V> it;
-
- /** Context. */
- private GridCacheContext<K, V> ctx;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public ValueIterator() {
- // No-op.
- }
-
- /**
- * @param map Base map.
- * @param filter Value filter.
- * @param ctx Cache context.
- */
- private ValueIterator(
- GridCacheConcurrentMap map,
- CacheEntryPredicate[] filter,
- GridCacheContext ctx) {
- it = new Iterator0<>(map, true, filter, -1, -1);
-
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public V next() {
- it.next();
-
- // Cached value.
- return it.currentValue();
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- it.remove();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(it);
- out.writeObject(ctx);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- it = (Iterator0)in.readObject();
- ctx = (GridCacheContext<K, V>)in.readObject();
- }
- }
-
- /**
- * Key iterator.
- */
- private static class KeyIterator<K, V> implements Iterator<K>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Hash table iterator. */
- private Iterator0<K, V> it;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public KeyIterator() {
- // No-op.
- }
-
- /**
- * @param map Cache map.
- * @param filter Filter.
- */
- private KeyIterator(GridCacheConcurrentMap map, boolean keepBinary, CacheEntryPredicate[] filter) {
- it = new Iterator0<>(map, false, filter, -1, -1);
- this.keepBinary = keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public K next() {
- return (K)it.ctx.cacheObjectContext().unwrapBinaryIfNeeded(it.next().key(), keepBinary, true);
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- it.remove();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(it);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- it = (Iterator0)in.readObject();
- }
- }
-
- /**
- * Key set.
- */
- private static class KeySet<K, V> extends AbstractSet<K> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Base entry set. */
- private Set0<K, V> set;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public KeySet() {
- // No-op.
- }
-
- /**
- * @param map Base map.
- * @param filter Key filter.
- * @param internal Whether to allow internal keys.
- */
- private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter, boolean internal) {
- assert map != null;
-
- set = new Set0<>(map, internal ? filter : nonInternal(filter));
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<K> iterator() {
- return set.keyIterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return set.size();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean contains(Object o) {
- return set.containsKey((K)o);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean remove(Object o) {
- return set.removeKey((K)o);
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
- set.clear();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(set);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- set = (Set0<K, V>)in.readObject();
- }
- }
-
- /**
- * Value set.
- * <p>
- * Note that the set is static for {@link Externalizable} support.
- */
- private static class Values<K, V> extends AbstractCollection<V> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Base entry set. */
- private Set0<K, V> set;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public Values() {
- // No-op.
- }
-
- /**
- * @param map Base map.
- * @param filter Value filter.
- */
- private Values(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) {
- assert map != null;
-
- set = new Set0<>(map, nonInternal(filter));
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<V> iterator() {
- return set.valueIterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return set.size();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean contains(Object o) {
- return set.containsValue((V)o);
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
- set.clear();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(set);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- set = (Set0<K, V>)in.readObject();
- }
- }
-
- /**
- * Entry set.
- */
- private static class EntrySet<K, V> extends AbstractSet<Cache.Entry<K, V>> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Base entry set. */
- private Set0<K, V> set;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public EntrySet() {
- // No-op.
- }
-
- /**
- * @param map Base map.
- * @param filter Key filter.
- */
- private EntrySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) {
- this(map, filter, false);
- }
-
- /**
- * @param map Base map.
- * @param filter Key filter.
- * @param internal Whether to allow internal entries.
- */
- private EntrySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter,
- boolean internal) {
- assert map != null;
-
- set = new Set0<>(map, internal ? filter : nonInternal(filter));
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() {
- return set.entryIterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return set.size();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isEmpty() {
- return set.isEmpty();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean contains(Object o) {
- if (o instanceof CacheEntryImpl) {
- GridCacheEntryEx unwrapped = set.map.getEntry(((CacheEntryImpl)o).getKey());
-
- return unwrapped != null && set.contains(unwrapped);
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean remove(Object o) {
- return set.removeKey((K)o);
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
- set.clear();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(set);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- set = (Set0<K, V>)in.readObject();
- }
- }
-
- /**
- *
+ * @param filter Filter.
+ * @return Set of the mappings contained in this map.
*/
- private static class HashEntry {
- /** */
- private final GridCacheMapEntry mapEntry;
-
- /** */
- @GridToStringExclude
- private volatile HashEntry next;
-
- /**
- * @param mapEntry Entry.
- * @param next Next.
- */
- private HashEntry(
- GridCacheMapEntry mapEntry,
- HashEntry next
- ) {
- this.mapEntry = mapEntry;
- this.next = next;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HashEntry.class, this, super.toString());
- }
- }
+ public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
new file mode 100644
index 0000000..48dae76
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_CREATED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED;
+
+/**
+ * Implementation of concurrent cache map.
+ */
+public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap {
+ /** Default load factor. */
+ private static final float DFLT_LOAD_FACTOR = 0.75f;
+
+ /** Default concurrency level. */
+ private static final int DFLT_CONCUR_LEVEL = Runtime.getRuntime().availableProcessors() * 2;
+
+ /** Internal map. */
+ private final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map;
+
+ /** Map entry factory. */
+ private final GridCacheMapEntryFactory factory;
+
+ /** Cache context. */
+ private final GridCacheContext ctx;
+
+ /** Public size counter. */
+ private final AtomicInteger pubSize = new AtomicInteger();
+
+ /**
+ * Creates a new, empty map with the specified initial
+ * capacity.
+ *
+ * @param ctx Cache context.
+ * @param factory Entry factory.
+ * @param initialCapacity the initial capacity. The implementation
+ * performs internal sizing to accommodate this many elements.
+
+ * @throws IllegalArgumentException if the initial capacity is
+ * negative.
+ */
+ public GridCacheConcurrentMapImpl(GridCacheContext ctx, GridCacheMapEntryFactory factory, int initialCapacity) {
+ this(ctx, factory, initialCapacity, DFLT_LOAD_FACTOR, DFLT_CONCUR_LEVEL);
+ }
+
+ /**
+ * Creates a new, empty map with the specified initial
+ * capacity, load factor and concurrency level.
+ *
+ * @param ctx Cache context.
+ * @param factory Entry factory.
+ * @param initialCapacity the initial capacity. The implementation
+ * performs internal sizing to accommodate this many elements.
+ * @param loadFactor the load factor threshold, used to control resizing.
+ * Resizing may be performed when the average number of elements per
+ * bin exceeds this threshold.
+ * @param concurrencyLevel the estimated number of concurrently
+ * updating threads. The implementation performs internal sizing
+ * to try to accommodate this many threads.
+ * @throws IllegalArgumentException if the initial capacity is
+ * negative or the load factor or concurrencyLevel are
+ * non-positive.
+ */
+ public GridCacheConcurrentMapImpl(
+ GridCacheContext ctx,
+ GridCacheMapEntryFactory factory,
+ int initialCapacity,
+ float loadFactor,
+ int concurrencyLevel
+ ) {
+ this.ctx = ctx;
+ this.factory = factory;
+
+ map = new ConcurrentHashMap8<>(initialCapacity, loadFactor, concurrencyLevel);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
+ return map.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer,
+ KeyCacheObject key, @Nullable final CacheObject val, final boolean create, final boolean touch) {
+
+ GridCacheMapEntry cur = null;
+ GridCacheMapEntry created = null;
+ GridCacheMapEntry created0 = null;
+ GridCacheMapEntry doomed = null;
+
+ boolean done = false;
+
+ while (!done) {
+ GridCacheMapEntry entry = map.get(key);
+ created = null;
+ doomed = null;
+
+ if (entry == null) {
+ if (create) {
+ if (created0 == null)
+ created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+
+ cur = created = created0;
+
+ done = map.putIfAbsent(created.key(), created) == null;
+ }
+ else
+ done = true;
+ }
+ else {
+ if (entry.obsolete()) {
+ doomed = entry;
+
+ if (create) {
+ if (created0 == null)
+ created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+
+ cur = created = created0;
+
+ done = map.replace(entry.key(), doomed, created);
+ }
+ else
+ done = map.remove(entry.key(), doomed);
+ }
+ else {
+ cur = entry;
+
+ done = true;
+ }
+ }
+ }
+
+ int sizeChange = 0;
+
+ if (doomed != null) {
+ synchronized (doomed) {
+ if (!doomed.deleted())
+ sizeChange--;
+ }
+
+ if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
+ ctx.events().addEvent(doomed.partition(),
+ doomed.key(),
+ ctx.localNodeId(),
+ (IgniteUuid)null,
+ null,
+ EVT_CACHE_ENTRY_DESTROYED,
+ null,
+ false,
+ null,
+ false,
+ null,
+ null,
+ null,
+ true);
+ }
+
+ if (created != null) {
+ sizeChange++;
+
+ if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED))
+ ctx.events().addEvent(created.partition(),
+ created.key(),
+ ctx.localNodeId(),
+ (IgniteUuid)null,
+ null,
+ EVT_CACHE_ENTRY_CREATED,
+ null,
+ false,
+ null,
+ false,
+ null,
+ null,
+ null,
+ true);
+
+ if (touch)
+ ctx.evicts().touch(
+ cur,
+ topVer);
+ }
+
+ if (sizeChange != 0)
+ pubSize.addAndGet(sizeChange);
+
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeEntry(final GridCacheEntryEx entry) {
+ boolean removed = map.remove(entry.key(), entry);
+
+ if (removed) {
+ if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
+ // Event notification.
+ ctx.events().addEvent(entry.partition(), entry.key(), ctx.localNodeId(), (IgniteUuid)null, null,
+ EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null, false);
+
+ synchronized (entry) {
+ if (!entry.deleted())
+ decrementPublicSize(entry);
+ }
+ }
+
+ return removed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return map.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int publicSize() {
+ return pubSize.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementPublicSize(GridCacheEntryEx e) {
+ pubSize.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void decrementPublicSize(GridCacheEntryEx e) {
+ pubSize.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<KeyCacheObject> keySet(final CacheEntryPredicate... filter) {
+ final IgnitePredicate<KeyCacheObject> p = new IgnitePredicate<KeyCacheObject>() {
+ @Override public boolean apply(KeyCacheObject key) {
+ GridCacheMapEntry entry = map.get(key);
+
+ return entry != null && entry.visitable(filter);
+ }
+ };
+
+ return new AbstractSet<KeyCacheObject>() {
+ @Override public Iterator<KeyCacheObject> iterator() {
+ return F.iterator0(map.keySet(), true, p);
+ }
+
+ @Override public int size() {
+ return F.size(iterator());
+ }
+
+ @Override public boolean contains(Object o) {
+ if (!(o instanceof KeyCacheObject))
+ return false;
+
+ return map.keySet().contains(o) && p.apply((KeyCacheObject)o);
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) {
+ final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() {
+ @Override public boolean apply(GridCacheMapEntry entry) {
+ return entry.visitable(filter);
+ }
+ };
+
+ return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) {
+ final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() {
+ @Override public boolean apply(GridCacheMapEntry entry) {
+ return F.isAll(entry, filter);
+ }
+ };
+
+ return F.viewReadOnly(map.values(), F.<GridCacheMapEntry>identity(), p);
+ }
+
+ /** {@inheritDoc} */
+ @Deprecated @Nullable @Override public GridCacheMapEntry randomEntry() {
+ Iterator<GridCacheMapEntry> iterator = map.values().iterator();
+
+ if (iterator.hasNext())
+ return iterator.next();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<GridCacheMapEntry> entrySet(final CacheEntryPredicate... filter) {
+ final IgnitePredicate<GridCacheMapEntry> p = new IgnitePredicate<GridCacheMapEntry>() {
+ @Override public boolean apply(GridCacheMapEntry entry) {
+ return entry.visitable(filter);
+ }
+ };
+
+ return new AbstractSet<GridCacheMapEntry>() {
+ @Override public Iterator<GridCacheMapEntry> iterator() {
+ return F.iterator0(map.values(), true, p);
+ }
+
+ @Override public int size() {
+ return F.size(iterator());
+ }
+
+ @Override public boolean contains(Object o) {
+ if (!(o instanceof GridCacheMapEntry))
+ return false;
+
+ GridCacheMapEntry entry = (GridCacheMapEntry)o;
+
+ return entry.equals(map.get(entry.key())) && p.apply(entry);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 88d6e04..7ad6c77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
@@ -589,14 +588,7 @@ public class GridCacheContext<K, V> implements Externalizable {
assert e != null;
assert !e.isInternal() : e;
- cache.map().incrementSize(e);
-
- if (isDht() || isColocated() || isDhtAtomic()) {
- GridDhtLocalPartition part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE, false);
-
- if (part != null)
- part.incrementPublicSize();
- }
+ cache.incrementSize(e);
}
/**
@@ -607,14 +599,7 @@ public class GridCacheContext<K, V> implements Externalizable {
assert e != null;
assert !e.isInternal() : e;
- cache.map().decrementSize(e);
-
- if (isDht() || isColocated() || isDhtAtomic()) {
- GridDhtLocalPartition part = topology().localPartition(e.partition(), AffinityTopologyVersion.NONE, false);
-
- if (part != null)
- part.decrementPublicSize();
- }
+ cache.decrementSize(e);
}
/**
@@ -1097,8 +1082,8 @@ public class GridCacheContext<K, V> implements Externalizable {
for (CacheEntryPredicate p0 : p) {
if ((p0 instanceof CacheEntrySerializablePredicate) &&
- ((CacheEntrySerializablePredicate)p0).predicate() instanceof CacheEntryPredicateNoValue)
- return true;
+ ((CacheEntrySerializablePredicate)p0).predicate() instanceof CacheEntryPredicateNoValue)
+ return true;
}
return false;
@@ -1155,7 +1140,7 @@ public class GridCacheContext<K, V> implements Externalizable {
*/
@SuppressWarnings({"unchecked"})
public IgnitePredicate<Cache.Entry<K, V>>[] vararg(IgnitePredicate<Cache.Entry<K, V>> p) {
- return p == null ? CU.<K, V>empty() : new IgnitePredicate[]{p};
+ return p == null ? CU.<K, V>empty() : new IgnitePredicate[] {p};
}
/**
@@ -1320,8 +1305,6 @@ public class GridCacheContext<K, V> implements Externalizable {
return (opCtx != null && opCtx.skipStore());
}
-
-
/**
* @return {@code True} if need check near cache context.
*/
@@ -1804,9 +1787,20 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Cache key object.
*/
public KeyCacheObject toCacheKeyObject(Object obj) {
+ return toCacheKeyObject(obj, false);
+ }
+
+ /**
+ * @param obj Object.
+ * @return Cache key object.
+ */
+ public KeyCacheObject toCacheKeyObject(Object obj, boolean includePartition) {
assert validObjectForCache(obj) : obj;
- return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true);
+ if (includePartition)
+ return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true, affinity().partition(obj));
+ else
+ return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true);
}
/**
@@ -1838,8 +1832,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @throws IgniteCheckedException If failed.
*/
@Nullable public CacheObject unswapCacheObject(byte type, byte[] bytes, @Nullable IgniteUuid clsLdrId)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
if (ctx.config().isPeerClassLoadingEnabled() && type != CacheObject.TYPE_BYTE_ARR) {
ClassLoader ldr = clsLdrId != null ? deploy().getClassLoader(clsLdrId) : deploy().localLoader();
@@ -1973,15 +1966,24 @@ public class GridCacheContext<K, V> implements Externalizable {
/**
* @param keys Keys.
- * @return Co
+ * @return Read-only collection of KeyCacheObject instances.
*/
public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys) {
+ return cacheKeysView(keys, false);
+ }
+
+ /**
+ * @param keys Keys.
+ * @param includePartition Include partition.
+ * @return Read-only collection of KeyCacheObject instances.
+ */
+ public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys, final boolean includePartition) {
return F.viewReadOnly(keys, new C1<Object, KeyCacheObject>() {
@Override public KeyCacheObject apply(Object key) {
if (key == null)
throw new NullPointerException("Null key.");
- return toCacheKeyObject(key);
+ return toCacheKeyObject(key, includePartition);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index d54ecd6..8e66233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -304,7 +304,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
private void addEntries(ClassLoader ldr, Collection<KeyCacheObject> keys, GridCacheAdapter cache) {
GridCacheContext cacheCtx = cache.context();
- for (GridCacheEntryEx e : (Collection<GridCacheEntryEx>)cache.entries()) {
+ for (GridCacheEntryEx e : (Iterable<GridCacheEntryEx>)cache.entries()) {
boolean undeploy = cacheCtx.isNear() ?
undeploy(ldr, e, cacheCtx.near()) || undeploy(ldr, e, cacheCtx.near().dht()) :
undeploy(ldr, e, cacheCtx.cache());
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntrySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntrySet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntrySet.java
deleted file mode 100644
index 1dbc564..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntrySet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Entry set backed by cache itself.
- */
-public class GridCacheEntrySet<K, V> extends AbstractSet<Cache.Entry<K, V>> {
- /** Cache context. */
- private final GridCacheContext<K, V> ctx;
-
- /** Filter. */
- private final IgnitePredicate<Cache.Entry<K, V>>[] filter;
-
- /** Base set. */
- private final Set<Cache.Entry<K, V>> set;
-
- /**
- * @param ctx Cache context.
- * @param c Entry collection.
- * @param filter Filter.
- */
- public GridCacheEntrySet(GridCacheContext<K, V> ctx, Collection<? extends Cache.Entry<K, V>> c,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
- set = new HashSet<>(c.size(), 1.0f);
-
- assert ctx != null;
-
- this.ctx = ctx;
- this.filter = filter;
-
- for (Cache.Entry<K, V> e : c) {
- if (e != null)
- set.add(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Cache.Entry<K, V>> iterator() {
- return new GridCacheIterator<>(ctx, set, F.<Cache.Entry<K, V>>identity(), filter);
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
- throw new UnsupportedOperationException("clear");
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean remove(Object o) {
- if (!(o instanceof CacheEntryImpl))
- return false;
-
- Cache.Entry<K, V> e = (Cache.Entry<K,V>)o;
-
- if (F.isAll(e, filter) && set.remove(e)) {
- try {
- ((IgniteKernal)ctx.grid()).getCache(ctx.name()).remove(e.getKey(), e.getValue());
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException(ex);
- }
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return F.size(set, filter);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public boolean contains(Object o) {
- if (!(o instanceof CacheEntryImpl))
- return false;
-
- Cache.Entry<K,V> e = (Cache.Entry<K, V>)o;
-
- return F.isAll(e, filter) && set.contains(e);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index a21e18b..c102c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1514,7 +1514,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (log.isDebugEnabled())
log.debug("Touching partition entries: " + part);
- touchOnTopologyChange(part.entries());
+ touchOnTopologyChange(part.allEntries());
}
}
}