You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sh...@apache.org on 2016/11/01 02:37:45 UTC
[26/50] [abbrv] ignite git commit: IGNITE-2948 - Optimize usage of
GridCacheConcurrentMap
IGNITE-2948 - Optimize usage of GridCacheConcurrentMap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b7470b3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b7470b3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b7470b3
Branch: refs/heads/ignite-2788
Commit: 0b7470b381cf6571577bef7a2dd3f1833dce61e5
Parents: bc98fa5
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Apr 26 15:07:28 2016 +0300
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Fri May 13 16:11:15 2016 +0900
----------------------------------------------------------------------
.../internal/benchmarks/model/IntValue.java | 19 +-
.../internal/binary/BinaryObjectImpl.java | 14 +
.../processors/cache/GridCacheAdapter.java | 448 ++--
.../cache/GridCacheAffinityManager.java | 10 +-
.../cache/GridCacheClearAllRunnable.java | 2 +-
.../cache/GridCacheConcurrentMap.java | 1996 +-----------------
.../cache/GridCacheConcurrentMapImpl.java | 344 +++
.../processors/cache/GridCacheContext.java | 56 +-
.../cache/GridCacheDeploymentManager.java | 2 +-
.../processors/cache/GridCacheEntrySet.java | 113 -
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheKeySet.java | 104 -
.../processors/cache/GridCacheMapEntry.java | 22 +-
.../processors/cache/GridCacheProcessor.java | 1 -
.../processors/cache/GridCacheProxyImpl.java | 2 +-
.../processors/cache/GridCacheUtils.java | 1 -
.../processors/cache/GridNoStorageCacheMap.java | 107 +
.../processors/cache/IgniteInternalCache.java | 2 +-
.../processors/cache/KeyCacheObject.java | 11 +
.../processors/cache/KeyCacheObjectImpl.java | 32 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 24 +-
.../GridDistributedCacheAdapter.java | 4 +-
.../distributed/GridDistributedLockRequest.java | 19 +-
.../GridDistributedTxRemoteAdapter.java | 1 -
.../GridDistributedUnlockRequest.java | 16 +-
.../dht/GridCachePartitionedConcurrentMap.java | 191 ++
.../dht/GridClientPartitionTopology.java | 7 -
.../distributed/dht/GridDhtCacheAdapter.java | 77 +-
.../distributed/dht/GridDhtCacheEntry.java | 25 +-
.../distributed/dht/GridDhtLocalPartition.java | 152 +-
.../distributed/dht/GridDhtLockRequest.java | 16 +-
.../dht/GridDhtPartitionTopology.java | 7 -
.../dht/GridDhtPartitionTopologyImpl.java | 281 ++-
.../distributed/dht/GridDhtUnlockRequest.java | 15 +-
.../distributed/dht/GridNoStorageCacheMap.java | 122 --
.../dht/GridPartitionedGetFuture.java | 2 +-
.../dht/GridPartitionedSingleGetFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 75 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 22 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 31 +-
.../dht/colocated/GridDhtColocatedCache.java | 13 +-
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 8 +-
.../distributed/near/GridNearCacheAdapter.java | 94 +-
.../distributed/near/GridNearGetFuture.java | 2 +-
.../distributed/near/GridNearGetRequest.java | 45 +-
.../distributed/near/GridNearLockRequest.java | 16 +-
.../near/GridNearSingleGetRequest.java | 22 +-
.../distributed/near/GridNearUnlockRequest.java | 21 +-
.../processors/cache/local/GridLocalCache.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 24 +-
.../cache/transactions/IgniteInternalTx.java | 1 -
.../cache/transactions/IgniteTxAdapter.java | 1 -
.../cache/transactions/IgniteTxEntry.java | 33 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 10 +
.../IgniteCacheObjectProcessorImpl.java | 36 +-
.../GridCacheAtomicReferenceImpl.java | 2 +-
.../internal/util/PartitionedReadOnlySet.java | 71 +
.../util/StripedCompositeReadWriteLock.java | 10 +
.../ignite/internal/util/lang/GridFunc.java | 3 +-
.../ignite/internal/visor/cache/VisorCache.java | 6 +-
.../GridCachePreloadingEvictionsSelfTest.java | 4 +-
.../cache/GridCacheTtlManagerSelfTest.java | 2 +-
...idCacheValueConsistencyAbstractSelfTest.java | 3 +-
.../IgniteDynamicClientCacheStartSelfTest.java | 10 -
.../IgniteTxStoreExceptionAbstractSelfTest.java | 8 +-
.../GridCacheBinaryObjectsAbstractSelfTest.java | 2 +-
...actQueueFailoverDataConsistencySelfTest.java | 2 +-
.../GridCacheQueueCleanupSelfTest.java | 4 +-
.../GridCacheSequenceApiSelfAbstractTest.java | 37 -
.../GridCacheSetAbstractSelfTest.java | 5 +-
.../GridCacheSetFailoverAbstractSelfTest.java | 8 +-
.../IgniteDataStructureUniqueNameTest.java | 3 +-
.../IgnitePartitionedQueueNoBackupsTest.java | 6 +-
.../IgnitePartitionedSetNoBackupsSelfTest.java | 6 +-
.../distributed/dht/GridCacheDhtTestUtils.java | 9 +-
.../near/GridCacheNearOneNodeSelfTest.java | 4 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 10 +-
79 files changed, 1958 insertions(+), 2975 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java
index 0a97e36..45fbe79 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/model/IntValue.java
@@ -85,7 +85,24 @@ public class IntValue implements Externalizable, Binarylizable {
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IntValue value = (IntValue)o;
+
+ return val == value.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return "Value [id=" + val + ']';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 0997d6f..fa10de3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -72,6 +72,10 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
@GridDirectTransient
private boolean detachAllowed;
+ /** */
+ @GridDirectTransient
+ private int part = -1;
+
/**
* For {@link Externalizable}.
*/
@@ -94,6 +98,16 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void partition(int part) {
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
@Override public byte cacheObjectType() {
return TYPE_BINARY;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d807e26..8c1a750 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -23,8 +23,8 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
+import java.util.AbstractSet;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -101,13 +101,11 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.GridTriple;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
@@ -132,7 +130,6 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -146,8 +143,6 @@ import org.jsr166.LongAdder8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_KEY_VALIDATION_DISABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
-import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_CREATED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
@@ -304,7 +299,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@SuppressWarnings("OverriddenMethodCallDuringObjectConstruction")
protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) {
- this(ctx, new GridCacheConcurrentMap(ctx, startSize, null));
+ this(ctx, null);
}
/**
@@ -312,7 +307,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param map Concurrent map.
*/
@SuppressWarnings({"OverriddenMethodCallDuringObjectConstruction", "deprecation"})
- protected GridCacheAdapter(final GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
+ protected GridCacheAdapter(final GridCacheContext<K, V> ctx, @Nullable GridCacheConcurrentMap map) {
assert ctx != null;
this.ctx = ctx;
@@ -393,6 +388,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Increments map public size.
+ * @param e Map entry.
+ */
+ public void incrementSize(GridCacheMapEntry e) {
+ map.incrementPublicSize(e);
+ }
+
+ /**
+ * Decrements map public size.
+ * @param e Map entry.
+ */
+ public void decrementSize(GridCacheMapEntry e) {
+ map.decrementPublicSize(e);
+ }
+
+ /**
* @return Context.
*/
@Override public GridCacheContext<K, V> context() {
@@ -483,7 +494,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
-
/** {@inheritDoc} */
@Nullable @Override public ExpiryPolicy expiry() {
return null;
@@ -541,13 +551,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * @return Entry factory.
+ */
+ protected abstract GridCacheMapEntryFactory entryFactory();
+
+ /**
* Starts this cache. Child classes should override this method
* to provide custom start-up behavior.
*
* @throws IgniteCheckedException If start failed.
*/
public void start() throws IgniteCheckedException {
- // No-op.
+ if (map == null) {
+ int initSize = ctx.config().getStartSize();
+
+ if (!isLocal())
+ initSize /= ctx.affinity().partitions();
+
+ map = new GridCacheConcurrentMapImpl(ctx, entryFactory(), initSize);
+ }
}
/**
@@ -687,7 +709,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
modes.backup = true;
if (modes.heap)
- its.add(iterator(map.entries0().iterator(), !ctx.keepBinary()));
+ its.add(iterator(map.entries().iterator(), !ctx.keepBinary()));
}
else if (modes.heap) {
if (modes.near && ctx.isNear())
@@ -946,74 +968,28 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheMapEntry cur = map.getEntry(key);
if (cur == null || cur.obsolete()) {
- GridTriple<GridCacheMapEntry> t = map.putEntryIfObsoleteOrAbsent(
+ cur = map.putEntryIfObsoleteOrAbsent(
topVer,
key,
null,
- create);
-
- cur = t.get1();
-
- GridCacheEntryEx created = t.get2();
- GridCacheEntryEx doomed = t.get3();
-
- if (doomed != null && ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
- // Event notification.
- ctx.events().addEvent(doomed.partition(),
- doomed.key(),
- locNodeId,
- (IgniteUuid)null,
- null,
- EVT_CACHE_ENTRY_DESTROYED,
- null,
- false,
- null,
- false,
- null,
- null,
- null,
- true);
-
- if (created != null) {
- // Event notification.
- if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED))
- ctx.events().addEvent(created.partition(),
- created.key(),
- locNodeId,
- (IgniteUuid)null,
- null,
- EVT_CACHE_ENTRY_CREATED,
- null,
- false,
- null,
- false,
- null,
- null,
- null,
- true);
-
- if (touch)
- ctx.evicts().touch(
- cur,
- topVer);
- }
+ create, touch);
}
return cur;
}
/**
- * @return Set of internal cached entry representations, excluding {@link GridCacheInternal} keys.
+ * @return Set of internal cached entry representations.
*/
- public Set<GridCacheEntryEx> entries() {
- return map.entries0();
+ public Iterable<? extends GridCacheEntryEx> entries() {
+ return allEntries();
}
/**
- * @return Set of internal cached entry representations, including {@link GridCacheInternal} keys.
+ * @return Set of internal cached entry representations.
*/
- public Set<GridCacheEntryEx> allEntries() {
- return map.allEntries0();
+ public Iterable<? extends GridCacheEntryEx> allEntries() {
+ return map.entries();
}
/** {@inheritDoc} */
@@ -1022,8 +998,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
- return map.entriesx(filter);
+ @Override public Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) {
+ return new EntrySet(map.entrySet(filter));
}
/** {@inheritDoc} */
@@ -1033,22 +1009,57 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return map.keySet();
+ return new KeySet(map.entrySet());
}
/** {@inheritDoc} */
@Override public Set<K> keySetx() {
- return map.keySetx();
+ return keySet();
}
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return map.keySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()));
+ return new KeySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode())));
}
/** {@inheritDoc} */
- @Override public Collection<V> values() {
- return map.values();
+ @Override public Iterable<V> values() {
+ return values((CacheEntryPredicate[])null);
+ }
+
+ /**
+ * Collection of values cached on this node. You cannot modify this collection.
+ * <p>
+ * Iterator over this collection will not fail if collection was
+ * concurrently updated by another thread. This means that iterator may or
+ * may not return latest values depending on whether they were added before
+ * or after current iterator position.
+ * <p>
+ * NOTE: this operation is not distributed and returns only the values cached on this node.
+ *
+ * @param filter Filters.
+ * @return Collection of cached values.
+ */
+ public Iterable<V> values(final CacheEntryPredicate... filter) {
+ return new Iterable<V>() {
+ @Override public Iterator<V> iterator() {
+ return new Iterator<V>() {
+ private final Iterator<? extends GridCacheEntryEx> it = entries().iterator();
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public V next() {
+ return (V) it.next().wrap().getValue();
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ };
+ }
+ };
}
/**
@@ -1058,21 +1069,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public void removeIfObsolete(KeyCacheObject key) {
assert key != null;
- GridCacheEntryEx entry = map.removeEntryIfObsolete(key);
+ GridCacheMapEntry entry = map.getEntry(key);
- if (entry != null) {
- assert entry.obsolete() : "Removed non-obsolete entry: " + entry;
-
- if (log.isDebugEnabled())
- log.debug("Removed entry from cache: " + entry);
-
- if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
- // Event notification.
- ctx.events().addEvent(entry.partition(), entry.key(), locNodeId, (IgniteUuid)null, null,
- EVT_CACHE_ENTRY_DESTROYED, null, false, null, false, null, null, null, false);
- }
- else if (log.isDebugEnabled())
- log.debug("Remove will not be done for key (obsolete entry got replaced or removed): " + key);
+ if (entry.obsolete())
+ removeEntry(entry);
}
/**
@@ -1088,7 +1088,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
int keySize = size();
int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0),
- Runtime.getRuntime().availableProcessors());
+ Runtime.getRuntime().availableProcessors());
if (cnt == 0)
cnt = 1; // Still perform cleanup since there could be entries in swap.
@@ -1256,7 +1256,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param entry Removes entry from cache if currently mapped value is the same as passed.
*/
public void removeEntry(GridCacheEntryEx entry) {
- map.removeEntry(entry);
+ boolean removed = map.removeEntry(entry);
+
+ if (log.isDebugEnabled()) {
+ if (removed)
+ log.debug("Removed entry from cache: " + entry);
+ else
+ log.debug("Remove will not be done for key (entry got replaced or removed): " + entry.key());
+ }
}
/**
@@ -1399,7 +1406,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
T2<V, GridCacheVersion> t = (T2<V, GridCacheVersion>)get(key, !ctx.keepBinary(), true);
- CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null;
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null;
if (ctx.config().getInterceptor() != null) {
V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
@@ -1424,7 +1431,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = getAsync(key, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
- fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
+ fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
@Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
return (V)ctx.config().getInterceptor().onGet(key, f.get());
}
@@ -1451,20 +1458,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain(
new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() {
- @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
- throws IgniteCheckedException {
- T2<V, GridCacheVersion> t = f.get();
+ @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f)
+ throws IgniteCheckedException {
+ T2<V, GridCacheVersion> t = f.get();
- CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null;
- if (intercept) {
- V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
+ CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null;
+ if (intercept) {
+ V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null);
- return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null);
+ return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null);
+ }
+ else
+ return val;
}
- else
- return val;
- }
- });
+ });
if (statsEnabled)
fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start));
@@ -1516,7 +1523,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return res;
}
-
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) {
A.notNull(keys, "keys");
@@ -1541,7 +1547,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(
+ @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(
@Nullable final Collection<? extends K> keys) {
A.notNull(keys, "keys");
@@ -1850,7 +1856,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final AffinityTopologyVersion topVer = tx == null ?
(canRemap ?
ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
- tx.topologyVersion();
+ tx.topologyVersion();
int keysSize = keys.size();
@@ -2091,8 +2097,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @throws IgniteCheckedException If failed.
*/
@Nullable public V getAndPut(final K key, final V val, @Nullable final CacheEntryPredicate filter)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
boolean statsEnabled = ctx.config().isStatisticsEnabled();
long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2351,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
@Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
- Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
return tx.invokeAsync(ctx, readyTopVer, invokeMap, args);
}
@@ -2578,7 +2583,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- if(statsEnabled)
+ if (statsEnabled)
fut.listen(new UpdatePutTimeStatClosure<V>(metrics0(), start));
return fut;
@@ -2912,11 +2917,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert ctx.isLocal();
for (Iterator<KeyCacheObject> it = ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE);
- it.hasNext(); )
+ it.hasNext(); )
remove((K)it.next());
for (Iterator<KeyCacheObject> it = ctx.swap().swapKeyIterator(true, true, AffinityTopologyVersion.NONE);
- it.hasNext(); )
+ it.hasNext(); )
remove((K)it.next());
removeAll(keySet());
@@ -3436,7 +3441,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ttl == CU.TTL_ZERO)
return;
- loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>) p, topVer, replicate, ttl);
+ loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>)p, topVer, replicate, ttl);
}
}, args);
}
@@ -3490,8 +3495,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> localLoadCacheAsync(final IgniteBiPredicate<K, V> p,
- final Object[] args)
- {
+ final Object[] args) {
return ctx.closures().callLocalSafe(
ctx.projectSafe(new Callable<Object>() {
@Nullable @Override public Object call() throws IgniteCheckedException {
@@ -3596,8 +3600,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public void localLoad(Collection<? extends K> keys,
@Nullable ExpiryPolicy plc)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
final boolean replicate = ctx.isDrEnabled();
final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
@@ -4574,14 +4577,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Entry set.
*/
public Set<Cache.Entry<K, V>> entrySet(@Nullable CacheEntryPredicate... filter) {
- return map.entries(filter);
+ return entrySetx(filter);
}
/**
* @return Primary entry set.
*/
public Set<Cache.Entry<K, V>> primaryEntrySet() {
- return map.entries(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode()));
+ return new EntrySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode())));
}
/**
@@ -4612,15 +4615,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
boolean deserializeBinary,
boolean needVer) throws IgniteCheckedException {
- return getAsync(key,
- !ctx.config().isReadFromBackup(),
- /*skip tx*/false,
- null,
- taskName,
- deserializeBinary,
- false,
- /*can remap*/true,
- needVer).get();
+ try {
+ return getAsync(key,
+ !ctx.config().isReadFromBackup(),
+ /*skip tx*/false,
+ null,
+ taskName,
+ deserializeBinary,
+ false,
+ /*can remap*/true,
+ needVer).get();
+ }
+ catch (IgniteException e) {
+ if (e.getCause(IgniteCheckedException.class) != null)
+ throw e.getCause(IgniteCheckedException.class);
+ else
+ throw e;
+ }
}
/**
@@ -4741,7 +4752,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializeBinary Deserialize binary flag.
* @return Public API iterator.
*/
- protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<GridCacheEntryEx> it,
+ protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it,
final boolean deserializeBinary) {
return new Iterator<Cache.Entry<K, V>>() {
{
@@ -4807,8 +4818,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@Nullable private Cache.Entry<K, V> toCacheEntry(GridCacheEntryEx entry,
boolean deserializeBinary)
- throws IgniteCheckedException, GridCacheEntryRemovedException
- {
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject val = entry.innerGet(
null,
null,
@@ -5497,7 +5507,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param loadArgs Arguments.
* @param plc Policy.
*/
- private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, Object[] loadArgs,
+ private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p,
+ Object[] loadArgs,
ExpiryPolicy plc) {
super(cacheName, topVer);
@@ -5710,7 +5721,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
*
*/
- static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable{
+ static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -5831,8 +5842,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public void applyx(KeyCacheObject key, Object val, GridCacheVersion ver)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
assert ver != null;
if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val))
@@ -5914,8 +5924,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private LoadCacheClosure(String cacheName,
IgniteBiPredicate<K, V> p,
Object[] args,
- @Nullable ExpiryPolicy plc)
- {
+ @Nullable ExpiryPolicy plc) {
this.cacheName = cacheName;
this.p = p;
this.args = args;
@@ -6380,4 +6389,173 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return null;
}
}
+
+ /**
+ * Iterator implementation for KeySet.
+ */
+ private final class KeySetIterator implements Iterator<K> {
+ /** Internal map entry iterator. */
+ private final Iterator<GridCacheMapEntry> internalIterator;
+
+ /** Keep binary flag. */
+ private final boolean keepBinary;
+
+ /** Current entry. */
+ private GridCacheMapEntry current;
+
+ /**
+ * Constructor.
+ * @param internalIterator Internal iterator.
+ * @param keepBinary Keep binary flag.
+ */
+ private KeySetIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) {
+ this.internalIterator = internalIterator;
+ this.keepBinary = keepBinary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return internalIterator.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public K next() {
+ current = internalIterator.next();
+
+ return (K)ctx.unwrapBinaryIfNeeded(current.key(), keepBinary, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ if (current == null)
+ throw new IllegalStateException();
+
+ try {
+ GridCacheAdapter.this.getAndRemove((K)current.key());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ current = null;
+ }
+ }
+
+ /**
+ * A wrapper over internal map that provides set semantics and constant-time contains() check.
+ */
+ private final class KeySet extends AbstractSet<K> {
+ /** Internal entry set. */
+ private final Set<GridCacheMapEntry> internalSet;
+
+ /** Keep binary flag. */
+ private final boolean keepBinary;
+
+ /**
+ * Constructor
+ * @param internalSet Internal set.
+ */
+ private KeySet(Set<GridCacheMapEntry> internalSet) {
+ this.internalSet = internalSet;
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ keepBinary = opCtx != null && opCtx.isKeepBinary();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<K> iterator() {
+ return new KeySetIterator(internalSet.iterator(), keepBinary);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return F.size(iterator());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Object o) {
+ GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o));
+
+ return entry != null && internalSet.contains(entry);
+ }
+ }
+
+ /**
+ * Iterator implementation for EntrySet.
+ */
+ private final class EntryIterator implements Iterator<Cache.Entry<K, V>> {
+
+ /** Internal iterator. */
+ private final Iterator<GridCacheMapEntry> internalIterator;
+
+ /** Current entry. */
+ private GridCacheMapEntry current;
+
+ /**
+ * Constructor.
+ * @param internalIterator Internal iterator.
+ */
+ private EntryIterator(Iterator<GridCacheMapEntry> internalIterator) {
+ this.internalIterator = internalIterator;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return internalIterator.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cache.Entry<K, V> next() {
+ current = internalIterator.next();
+
+ return current.wrapLazyValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ if (current == null)
+ throw new IllegalStateException();
+
+ try {
+ GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue().getKey());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ current = null;
+ }
+ }
+
+ /**
+ * A wrapper over internal map that provides set semantics and constant-time contains() check.
+ */
+ private final class EntrySet extends AbstractSet<Cache.Entry<K, V>> {
+
+ /** Internal set. */
+ private final Set<GridCacheMapEntry> internalSet;
+
+ /** Constructor. */
+ private EntrySet(Set<GridCacheMapEntry> internalSet) {
+ this.internalSet = internalSet;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return new EntryIterator(internalSet.iterator());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return F.size(iterator());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Object o) {
+ GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o));
+
+ return entry != null && internalSet.contains(entry);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index f1767e0..5e843dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -197,10 +197,18 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
public int partition(Object key) {
GridAffinityAssignmentCache aff0 = aff;
+ if (key instanceof KeyCacheObject && ((KeyCacheObject)key).partition() != -1)
+ return ((KeyCacheObject)key).partition();
+
if (aff0 == null)
throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());
- return affFunction.partition(affinityKey(key));
+ int p = affFunction.partition(affinityKey(key));
+
+ if (key instanceof KeyCacheObject)
+ ((KeyCacheObject)key).partition(p);
+
+ return p;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
index ffce82d..4f97e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
@@ -79,7 +79,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable {
/** {@inheritDoc} */
@Override public void run() {
- Iterator<GridCacheEntryEx> iter = cache.map().stripedEntryIterator(id, totalCnt);
+ Iterator<? extends GridCacheEntryEx> iter = cache.entries().iterator();
while (iter.hasNext())
clearEntry(iter.next());