You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/19 08:51:54 UTC
[2/4] ignite git commit: ignite-2412 Do not call 'asyncOp' for
synchronous operations
ignite-2412 Do not call 'asyncOp' for synchronous operations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c530d47b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c530d47b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c530d47b
Branch: refs/heads/ignite-4371
Commit: c530d47b8e3fd514e49bc59a1a7a3bde1060912a
Parents: 0c782b0
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 16 19:23:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 16 19:23:29 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 721 +++++++------------
.../processors/cache/IgniteCacheProxy.java | 8 -
.../dht/atomic/GridDhtAtomicCache.java | 472 +++++++-----
.../dht/colocated/GridDhtColocatedCache.java | 13 -
.../distributed/near/GridNearAtomicCache.java | 65 +-
.../local/atomic/GridLocalAtomicCache.java | 177 +----
...nabledMultiNodeLongTxTimeoutFullApiTest.java | 2 +-
...lockMessageSystemPoolStarvationSelfTest.java | 14 +-
8 files changed, 597 insertions(+), 875 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c530d47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 88aa4e0..5707b49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -248,16 +248,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Grid configuration. */
@GridToStringExclude
- protected IgniteConfiguration gridCfg;
+ private IgniteConfiguration gridCfg;
/** Cache metrics. */
protected CacheMetricsImpl metrics;
/** Cache localMxBean. */
- protected CacheMetricsMXBean localMxBean;
+ private CacheMetricsMXBean locMxBean;
/** Cache mxBean. */
- protected CacheMetricsMXBean clusterMxBean;
+ private CacheMetricsMXBean clusterMxBean;
/** Logger. */
protected IgniteLogger log;
@@ -288,9 +288,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
- /** */
- protected volatile boolean asyncToggled;
-
/** {@inheritDoc} */
@Override public String name() {
return cacheCfg.getName();
@@ -334,7 +331,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
metrics = new CacheMetricsImpl(ctx);
- localMxBean = new CacheLocalMetricsMXBeanImpl(this);
+ locMxBean = new CacheLocalMetricsMXBeanImpl(this);
clusterMxBean = new CacheClusterMetricsMXBeanImpl(this);
FileSystemConfiguration[] igfsCfgs = gridCfg.getFileSystemConfiguration();
@@ -367,18 +364,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * Toggles async flag if someone calls {@code withAsync()}
- * on proxy and since that we have to properly handle all cache
- * operations (sync and async) to put them in proper sequence.
- *
- * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
- */
- void toggleAsync() {
- if (!asyncToggled)
- asyncToggled = true;
- }
-
- /**
* Prints memory stats.
*/
public void printMemoryStats() {
@@ -471,49 +456,49 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public abstract GridCachePreloader preloader();
/** {@inheritDoc} */
- @Override public Affinity<K> affinity() {
+ @Override public final Affinity<K> affinity() {
return aff;
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantCast"})
- @Override public <K1, V1> IgniteInternalCache<K1, V1> cache() {
+ @Override public final <K1, V1> IgniteInternalCache<K1, V1> cache() {
return (IgniteInternalCache<K1, V1>)this;
}
/** {@inheritDoc} */
- @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
+ @Override public final GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
- @Override public boolean skipStore() {
+ @Override public final boolean skipStore() {
return false;
}
/** {@inheritDoc} */
- @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
+ @Override public final GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
- @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
+ @Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
/** {@inheritDoc} */
- @Nullable @Override public ExpiryPolicy expiry() {
+ @Nullable @Override public final ExpiryPolicy expiry() {
return null;
}
/** {@inheritDoc} */
- @Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
assert !CU.isUtilityCache(ctx.name());
assert !CU.isAtomicsCache(ctx.name());
assert !CU.isMarshallerCache(ctx.name());
@@ -524,14 +509,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalCache<K, V> withNoRetries() {
+ @Override public final IgniteInternalCache<K, V> withNoRetries() {
CacheOperationContext opCtx = new CacheOperationContext(false, null, false, null, true, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
- @Override public CacheConfiguration configuration() {
+ @Override public final CacheConfiguration configuration() {
return ctx.config();
}
@@ -630,7 +615,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean isEmpty() {
+ @Override public final boolean isEmpty() {
try {
return localSize(CachePeekModes.ONHEAP_ONLY) == 0;
}
@@ -640,7 +625,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean containsKey(K key) {
+ @Override public final boolean containsKey(K key) {
try {
return containsKeyAsync(key).get();
}
@@ -667,7 +652,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean containsKeys(Collection<? extends K> keys) {
+ @Override public final boolean containsKeys(Collection<? extends K> keys) {
try {
return containsKeysAsync(keys).get();
}
@@ -677,7 +662,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> containsKeysAsync(final Collection<? extends K> keys) {
+ @Override public final IgniteInternalFuture<Boolean> containsKeysAsync(final Collection<? extends K> keys) {
A.notNull(keys, "keys");
return getAllAsync(
@@ -708,7 +693,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode[] peekModes) throws IgniteCheckedException {
+ @Override public final Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode[] peekModes) throws IgniteCheckedException {
assert peekModes != null;
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -765,7 +750,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@SuppressWarnings("ForLoopReplaceableByForEach")
- @Nullable @Override public V localPeek(K key,
+ @Nullable @Override public final V localPeek(K key,
CachePeekMode[] peekModes,
@Nullable IgniteCacheExpiryPolicy plc)
throws IgniteCheckedException {
@@ -907,7 +892,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*
* @param ldr Class loader to undeploy.
*/
- public void onUndeploy(ClassLoader ldr) {
+ public final void onUndeploy(ClassLoader ldr) {
ctx.deploy().onUndeploy(ldr, context());
}
@@ -916,7 +901,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param key Entry key.
* @return Entry or <tt>null</tt>.
*/
- @Nullable public GridCacheEntryEx peekEx(KeyCacheObject key) {
+ @Nullable public final GridCacheEntryEx peekEx(KeyCacheObject key) {
return entry0(key, ctx.affinity().affinityTopologyVersion(), false, false);
}
@@ -925,7 +910,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param key Entry key.
* @return Entry or <tt>null</tt>.
*/
- @Nullable public GridCacheEntryEx peekEx(Object key) {
+ @Nullable public final GridCacheEntryEx peekEx(Object key) {
return entry0(ctx.toCacheKeyObject(key), ctx.affinity().affinityTopologyVersion(), false, false);
}
@@ -933,7 +918,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param key Entry key.
* @return Entry (never {@code null}).
*/
- public GridCacheEntryEx entryEx(Object key) {
+ public final GridCacheEntryEx entryEx(Object key) {
return entryEx(ctx.toCacheKeyObject(key), false);
}
@@ -941,7 +926,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param key Entry key.
* @return Entry (never {@code null}).
*/
- public GridCacheEntryEx entryEx(KeyCacheObject key) {
+ public final GridCacheEntryEx entryEx(KeyCacheObject key) {
return entryEx(key, false);
}
@@ -996,24 +981,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @return Set of internal cached entry representations.
*/
- public Iterable<? extends GridCacheEntryEx> entries() {
+ public final Iterable<? extends GridCacheEntryEx> entries() {
return allEntries();
}
/**
* @return Set of internal cached entry representations.
*/
- public Iterable<? extends GridCacheEntryEx> allEntries() {
+ public final Iterable<? extends GridCacheEntryEx> allEntries() {
return map.entries();
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> entrySet() {
+ @Override public final Set<Cache.Entry<K, V>> entrySet() {
return entrySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) {
+ @Override public final Set<Cache.Entry<K, V>> entrySetx(final CacheEntryPredicate... filter) {
boolean keepBinary = ctx.keepBinary();
return new EntrySet(map.entrySet(filter), keepBinary);
@@ -1025,17 +1010,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Set<K> keySet() {
+ @Override public final Set<K> keySet() {
return new KeySet(map.entrySet());
}
/** {@inheritDoc} */
- @Override public Set<K> keySetx() {
+ @Override public final Set<K> keySetx() {
return keySet();
}
/** {@inheritDoc} */
- @Override public Set<K> primaryKeySet() {
+ @Override public final Set<K> primaryKeySet() {
return new KeySet(map.entrySet(CU.cachePrimary(ctx.grid().affinity(ctx.name()), ctx.localNode())));
}
@@ -1057,7 +1042,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Filters.
* @return Collection of cached values.
*/
- public Iterable<V> values(final CacheEntryPredicate... filter) {
+ public final Iterable<V> values(final CacheEntryPredicate... filter) {
return new Iterable<V>() {
@Override public Iterator<V> iterator() {
return new Iterator<V>() {
@@ -1083,12 +1068,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*
* @param key Entry key.
*/
- public void removeIfObsolete(KeyCacheObject key) {
+ public final void removeIfObsolete(KeyCacheObject key) {
assert key != null;
GridCacheMapEntry entry = map.getEntry(key);
- if (entry.obsolete())
+ if (entry != null && entry.obsolete())
removeEntry(entry);
}
@@ -1272,11 +1257,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param entry Removes entry from cache if currently mapped value is the same as passed.
*/
- public void removeEntry(GridCacheEntryEx entry) {
- boolean removed = map.removeEntry(entry);
+ public final void removeEntry(GridCacheEntryEx entry) {
+ boolean rmvd = map.removeEntry(entry);
if (log.isDebugEnabled()) {
- if (removed)
+ if (rmvd)
log.debug("Removed entry from cache: " + entry);
else
log.debug("Remove will not be done for key (entry got replaced or removed): " + entry.key());
@@ -1311,7 +1296,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public V getForcePrimary(K key) throws IgniteCheckedException {
+ @Override public final V getForcePrimary(K key) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(
@@ -1328,7 +1313,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
+ @Override public final IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(
@@ -1349,7 +1334,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- public V getTopologySafe(K key) throws IgniteCheckedException {
+ public final V getTopologySafe(K key) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(
@@ -1366,12 +1351,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
+ @Nullable @Override public final Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
return getAllOutTxAsync(keys).get();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+ @Override public final IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllAsync(keys,
@@ -1385,15 +1370,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
false);
}
- /**
- * @param key Key.
- * @param topVer Topology version.
- * @return Entry.
- */
- @Nullable protected GridCacheEntryEx entryExSafe(KeyCacheObject key, AffinityTopologyVersion topVer) {
- return entryEx(key);
- }
-
/** {@inheritDoc} */
@Nullable @Override public V get(K key) throws IgniteCheckedException {
A.notNull(key, "key");
@@ -1533,14 +1509,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
+ @Override public final Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
A.notNull(keys, "keys");
boolean statsEnabled = ctx.config().isStatisticsEnabled();
long start = statsEnabled ? System.nanoTime() : 0L;
- Map<K, V> map = getAll(keys, !ctx.keepBinary(), false);
+ Map<K, V> map = getAll0(keys, !ctx.keepBinary(), false);
if (ctx.config().getInterceptor() != null)
map = interceptGet(keys, map);
@@ -1560,7 +1536,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
long start = statsEnabled ? System.nanoTime() : 0L;
- Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll(keys, !ctx.keepBinary(), true);
+ Map<K, T2<V, GridCacheVersion>> map = (Map<K, T2<V, GridCacheVersion>>)getAll0(keys, !ctx.keepBinary(), true);
Collection<CacheEntry<K, V>> res = new HashSet<>();
@@ -1875,7 +1851,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param needVer If {@code true} returns values as tuples containing value and version.
* @return Future.
*/
- public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
+ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@Nullable final Collection<KeyCacheObject> keys,
final boolean readThrough,
boolean checkTx,
@@ -2141,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public V getAndPut(K key, V val) throws IgniteCheckedException {
+ @Override public final V getAndPut(K key, V val) throws IgniteCheckedException {
return getAndPut(key, val, null);
}
@@ -2163,7 +2139,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- V prevVal = syncOp(new SyncOp<V>(true) {
+ V prevVal = getAndPut0(key, val, filter);
+
+ if (statsEnabled)
+ metrics0().addPutAndGetTimeNanos(System.nanoTime() - start);
+
+ return prevVal;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param filter Optional filter.
+ * @return Previous value.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter)
+ throws IgniteCheckedException {
+ return syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value();
}
@@ -2172,15 +2165,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "put [key=" + key + ", val=" + val + ", filter=" + filter + ']';
}
});
-
- if (statsEnabled)
- metrics0().addPutAndGetTimeNanos(System.nanoTime() - start);
-
- return prevVal;
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndPutAsync(K key, V val) {
+ @Override public final IgniteInternalFuture<V> getAndPutAsync(K key, V val) {
return getAndPutAsync(key, val, null);
}
@@ -2190,11 +2178,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Filter.
* @return Put operation future.
*/
- public IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+ protected final IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
+ A.notNull(key, "key", val, "val");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
IgniteInternalFuture<V> fut = getAndPutAsync0(key, val, filter);
if (statsEnabled)
@@ -2209,13 +2202,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Optional filter.
* @return Put operation future.
*/
- public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
- @Nullable final CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
+ public IgniteInternalFuture<V> getAndPutAsync0(final K key,
+ final V val,
+ @Nullable final CacheEntryPredicate filter)
+ {
return asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
@@ -2229,7 +2219,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean put(final K key, final V val) throws IgniteCheckedException {
+ @Override public final boolean put(final K key, final V val) throws IgniteCheckedException {
return put(key, val, null);
}
@@ -2253,7 +2243,26 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- Boolean stored = syncOp(new SyncOp<Boolean>(true) {
+ boolean stored = put0(key, val, filter);
+
+ if (statsEnabled && stored)
+ metrics0().addPutTimeNanos(System.nanoTime() - start);
+
+ return stored;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param filter Filter.
+ * @return {@code True} if optional filter passed and value was stored in cache,
+ * {@code false} otherwise. Note that this method will return {@code true} if filter is not
+ * specified.
+ * @throws IgniteCheckedException If put operation failed.
+ */
+ protected boolean put0(final K key, final V val, final CacheEntryPredicate filter)
+ throws IgniteCheckedException {
+ Boolean res = syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAsync(ctx, null, key, val, false, filter).get().success();
}
@@ -2263,10 +2272,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- if (statsEnabled)
- metrics0().addPutTimeNanos(System.nanoTime() - start);
+ assert res != null;
- return stored;
+ return res;
}
/** {@inheritDoc} */
@@ -2308,7 +2316,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer,
+ @Nullable @Override public final <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer,
K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
@@ -2541,7 +2549,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> putAsync(K key, V val) {
+ @Override public final IgniteInternalFuture<Boolean> putAsync(K key, V val) {
return putAsync(key, val, null);
}
@@ -2551,9 +2559,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param filter Filter.
* @return Put future.
*/
- public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+ public final IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
A.notNull(key, "key", val, "val");
+ if (keyCheck)
+ validateCacheKey(key);
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2574,9 +2585,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
@Nullable final CacheEntryPredicate filter) {
- if (keyCheck)
- validateCacheKey(key);
-
return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx,
@@ -2601,267 +2609,82 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAsync(ctx, null, key, val, true, ctx.noVal()).get().value();
- }
-
- @Override public String toString() {
- return "putIfAbsent [key=" + key + ", val=" + val + ']';
- }
- });
+ @Nullable @Override public final V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException {
+ return getAndPut(key, val, ctx.noVal());
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(final K key, final V val) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.noVal())
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
- }
-
- @Override public String toString() {
- return "putIfAbsentAsync [key=" + key + ", val=" + val + ']';
- }
- });
-
- if (statsEnabled)
- fut.listen(new UpdatePutTimeStatClosure<V>(metrics0(), start));
-
- return fut;
+ @Override public final IgniteInternalFuture<V> getAndPutIfAbsentAsync(final K key, final V val) {
+ return getAndPutAsync(key, val, ctx.noVal());
}
/** {@inheritDoc} */
- @Override public boolean putIfAbsent(final K key, final V val) throws IgniteCheckedException {
- boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- Boolean stored = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.putAsync(ctx, null, key, val, false, ctx.noVal()).get().success();
- }
-
- @Override public String toString() {
- return "putxIfAbsent [key=" + key + ", val=" + val + ']';
- }
- });
-
- if (statsEnabled && stored)
- metrics0().addPutTimeNanos(System.nanoTime() - start);
-
- return stored;
+ @Override public final boolean putIfAbsent(final K key, final V val) throws IgniteCheckedException {
+ return put(key, val, ctx.noVal());
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(final K key, final V val) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.putAsync(ctx,
- readyTopVer,
- key,
- val,
- false,
- ctx.noVal()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
- }
-
- @Override public String toString() {
- return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']';
- }
- });
-
- if (statsEnabled)
- fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start));
-
- return fut;
+ @Override public final IgniteInternalFuture<Boolean> putIfAbsentAsync(final K key, final V val) {
+ return putAsync(key, val, ctx.noVal());
}
/** {@inheritDoc} */
- @Nullable @Override public V getAndReplace(final K key, final V val) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAsync(ctx, null, key, val, true, ctx.hasVal()).get().value();
- }
-
- @Override public String toString() {
- return "replace [key=" + key + ", val=" + val + ']';
- }
- });
+ @Nullable @Override public final V getAndReplace(final K key, final V val) throws IgniteCheckedException {
+ return getAndPut(key, val, ctx.hasVal());
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndReplaceAsync(final K key, final V val) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.hasVal()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
- }
-
- @Override public String toString() {
- return "replaceAsync [key=" + key + ", val=" + val + ']';
- }
- });
-
- if (statsEnabled)
- fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start));
-
- return fut;
+ @Override public final IgniteInternalFuture<V> getAndReplaceAsync(final K key, final V val) {
+ return getAndPutAsync(key, val, ctx.hasVal());
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V val) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.putAsync(ctx, null, key, val, false, ctx.hasVal()).get().success();
- }
-
- @Override public String toString() {
- return "replacex [key=" + key + ", val=" + val + ']';
- }
- });
+ @Override public final boolean replace(final K key, final V val) throws IgniteCheckedException {
+ return put(key, val, ctx.hasVal());
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(final K key, final V val) {
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.putAsync(ctx, readyTopVer, key, val, false, ctx.hasVal()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
- }
-
- @Override public String toString() {
- return "replacexAsync [key=" + key + ", val=" + val + ']';
- }
- });
+ @Override public final IgniteInternalFuture<Boolean> replaceAsync(final K key, final V val) {
+ return putAsync(key, val, ctx.hasVal());
}
/** {@inheritDoc} */
- @Override public boolean replace(final K key, final V oldVal, final V newVal) throws IgniteCheckedException {
- A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
-
- if (keyCheck)
- validateCacheKey(key);
+ @Override public final boolean replace(final K key, final V oldVal, final V newVal) throws IgniteCheckedException {
+ A.notNull(oldVal, "oldVal");
- return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- // Register before hiding in the filter.
- if (ctx.deploymentEnabled())
- ctx.deploy().registerClass(oldVal);
-
- return tx.putAsync(ctx, null, key, newVal, false, ctx.equalsVal(oldVal)).get()
- .success();
- }
-
- @Override public String toString() {
- return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
- }
- });
+ return put(key, newVal, ctx.equalsVal(oldVal));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> replaceAsync(final K key, final V oldVal, final V newVal) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
+ A.notNull(oldVal, "oldVal");
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
-
- if (keyCheck)
- validateCacheKey(key);
-
- IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- // Register before hiding in the filter.
- if (ctx.deploymentEnabled()) {
- try {
- ctx.deploy().registerClass(oldVal);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- return tx.putAsync(ctx, readyTopVer, key, newVal, false, ctx.equalsVal(oldVal)).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
- }
-
- @Override public String toString() {
- return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
- }
- });
-
- if (statsEnabled)
- fut.listen(new UpdatePutAndGetTimeStatClosure<Boolean>(metrics0(), start));
-
- return fut;
+ return putAsync(key, newVal, ctx.equalsVal(oldVal));
}
/** {@inheritDoc} */
@Override public void putAll(@Nullable final Map<? extends K, ? extends V> m) throws IgniteCheckedException {
+ if (F.isEmpty(m))
+ return;
+
boolean statsEnabled = ctx.config().isStatisticsEnabled();
long start = statsEnabled ? System.nanoTime() : 0L;
- if (F.isEmpty(m))
- return;
-
if (keyCheck)
validateCacheKeys(m.keySet());
+ putAll0(m);
+
+ if (statsEnabled)
+ metrics0().addPutTimeNanos(System.nanoTime() - start);
+ }
+
+ /**
+ * @param m Map.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException {
syncOp(new SyncInOp(m.size() == 1) {
@Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, null, m, false).get();
@@ -2871,9 +2694,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "putAll [map=" + m + ']';
}
});
-
- if (statsEnabled)
- metrics0().addPutTimeNanos(System.nanoTime() - start);
}
/** {@inheritDoc} */
@@ -2884,6 +2704,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKeys(m.keySet());
+ return putAllAsync0(m);
+ }
+
+ /**
+ * @param m Map.
+ * @return Future.
+ */
+ protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) {
return asyncOp(new AsyncOp(m.keySet()) {
@Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.putAllAsync(ctx,
@@ -2909,11 +2737,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
+ V prevVal = getAndRemove0(key);
+
+ if (statsEnabled)
+ metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start);
+
+ return prevVal;
+ }
+
+ /**
+ * @param key Key.
+ * @return Previous value.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected V getAndRemove0(final K key) throws IgniteCheckedException {
final boolean keepBinary = ctx.keepBinary();
- V prevVal = syncOp(new SyncOp<V>(true) {
+ return syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key;
+ K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key;
V ret = tx.removeAllAsync(ctx,
null,
@@ -2923,9 +2765,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*singleRmv*/false).get().value();
if (ctx.config().getInterceptor() != null) {
- K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
+ K key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key0, true, false) : key0;
- return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
}
return ret;
@@ -2935,11 +2777,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "remove [key=" + key + ']';
}
});
-
- if (statsEnabled)
- metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start);
-
- return prevVal;
}
/** {@inheritDoc} */
@@ -2953,7 +2790,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
+ IgniteInternalFuture<V> fut = getAndRemoveAsync0(key);
+
+ if (statsEnabled)
+ fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start));
+
+ return fut;
+ }
+
+ /**
+ * @param key Key.
+ * @return Future.
+ */
+ protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) {
+ return asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx,
@@ -2968,11 +2818,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "removeAsync [key=" + key + ']';
}
});
-
- if (statsEnabled)
- fut.listen(new UpdateRemoveTimeStatClosure<V>(metrics0(), start));
-
- return fut;
}
/** {@inheritDoc} */
@@ -3005,6 +2850,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKeys(keys);
+ removeAll0(keys);
+
+ if (statsEnabled)
+ metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+ }
+
+ /**
+ * @param keys Keys.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException {
syncOp(new SyncInOp(keys.size() == 1) {
@Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx,
@@ -3019,24 +2875,34 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "removeAll [keys=" + keys + ']';
}
});
-
- if (statsEnabled)
- metrics0().addRemoveTimeNanos(System.nanoTime() - start);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(@Nullable final Collection<? extends K> keys) {
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<Object>();
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
- if (F.isEmpty(keys))
- return new GridFinishedFuture<Object>();
-
if (keyCheck)
validateCacheKeys(keys);
- IgniteInternalFuture<Object> fut = asyncOp(new AsyncOp(keys) {
+ IgniteInternalFuture<Object> fut = removeAllAsync0(keys);
+
+ if (statsEnabled)
+ fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
+
+ return fut;
+ }
+
+ /**
+ * @param keys Keys.
+ * @return Future.
+ */
+ protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) {
+ return asyncOp(new AsyncOp(keys) {
@Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
@@ -3050,15 +2916,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "removeAllAsync [keys=" + keys + ']';
}
});
-
- if (statsEnabled)
- fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
-
- return fut;
}
/** {@inheritDoc} */
@Override public boolean remove(final K key) throws IgniteCheckedException {
+ return remove(key, (CacheEntryPredicate)null);
+ }
+
+ /**
+ * @param key Key.
+ * @param filter Filter.
+ * @return {@code True} if entry was removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean remove(final K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
boolean statsEnabled = ctx.config().isStatisticsEnabled();
long start = statsEnabled ? System.nanoTime() : 0L;
@@ -3068,13 +2939,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- boolean rmv = syncOp(new SyncOp<Boolean>(true) {
+ boolean rmv = remove0(key, filter);
+
+ if (statsEnabled && rmv)
+ metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+
+ return rmv;
+ }
+
+ /**
+ * @param key Key.
+ * @return {@code True} if entry was removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
+ Boolean res = syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx,
null,
Collections.singletonList(key),
/*retval*/false,
- null,
+ filter,
/*singleRmv*/true).get().success();
}
@@ -3083,10 +2968,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- if (statsEnabled && rmv)
- metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+ assert res != null;
- return rmv;
+ return res;
}
/** {@inheritDoc} */
@@ -3111,7 +2995,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (keyCheck)
validateCacheKey(key);
- IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
+ IgniteInternalFuture<Boolean> fut = removeAsync0(key, filter);
+
+ if (statsEnabled)
+ fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start));
+
+ return fut;
+ }
+
+ /**
+ * @param key Key.
+ * @param filter Filter.
+ * @return Future.
+ */
+ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) {
+ return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
@@ -3126,11 +3024,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return "removeAsync [key=" + key + ", filter=" + filter + ']';
}
});
-
- if (statsEnabled)
- fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start));
-
- return fut;
}
/** {@inheritDoc} */
@@ -3172,86 +3065,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public boolean remove(final K key, final V val) throws IgniteCheckedException {
- boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- // Register before hiding in the filter.
- if (ctx.deploymentEnabled())
- ctx.deploy().registerClass(val);
-
- return tx.removeAllAsync(ctx,
- null,
- Collections.singletonList(key),
- /*retval*/false,
- ctx.equalsVal(val),
- /*singleRmv*/false).get().success();
- }
-
- @Override public String toString() {
- return "remove [key=" + key + ", val=" + val + ']';
- }
- });
+ @Override public final boolean remove(final K key, final V val) throws IgniteCheckedException {
+ A.notNull(val, "val");
- if (statsEnabled && rmv)
- metrics0().addRemoveTimeNanos(System.nanoTime() - start);
-
- return rmv;
+ return remove(key, ctx.equalsVal(val));
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> removeAsync(final K key, final V val) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key", val, "val");
-
- if (keyCheck)
- validateCacheKey(key);
-
- IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- // Register before hiding in the filter.
- if (ctx.deploymentEnabled()) {
- try {
- ctx.deploy().registerClass(val);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- return tx.removeAllAsync(ctx,
- readyTopVer,
- Collections.singletonList(key),
- /*retval*/false,
- ctx.equalsVal(val),
- /*singleRmv*/false).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
- }
+ @Override public final IgniteInternalFuture<Boolean> removeAsync(final K key, final V val) {
+ A.notNull(key, "val");
- @Override public String toString() {
- return "removeAsync [key=" + key + ", val=" + val + ']';
- }
- });
-
- if (statsEnabled)
- fut.listen(new UpdateRemoveTimeStatClosure<Boolean>(metrics0(), start));
-
- return fut;
+ return removeAsync(key, ctx.equalsVal(val));
}
/** {@inheritDoc} */
- @Override public CacheMetrics clusterMetrics() {
+ @Override public final CacheMetrics clusterMetrics() {
return clusterMetrics(ctx.grid().cluster().forCacheNodes(ctx.name()));
}
@@ -3280,7 +3108,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public CacheMetricsMXBean localMxBean() {
- return localMxBean;
+ return locMxBean;
}
/** {@inheritDoc} */
@@ -4610,9 +4438,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Failed future if waiting was interrupted.
*/
@Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
- if (!asyncToggled)
- return null;
-
try {
if (asyncOpsSem != null)
asyncOpsSem.acquire();
@@ -4630,8 +4455,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* Releases asynchronous operations permit, if limited.
*/
- protected void asyncOpRelease() {
- if (asyncOpsSem != null && asyncToggled)
+ private void asyncOpRelease() {
+ if (asyncOpsSem != null)
asyncOpsSem.release();
}
@@ -4794,12 +4619,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException {
- checkJta();
-
+ @Nullable public final V get(K key, boolean deserializeBinary, final boolean needVer) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
- return get(key, taskName, deserializeBinary, needVer);
+ return get0(key, taskName, deserializeBinary, needVer);
}
/**
@@ -4810,11 +4633,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Cached value.
* @throws IgniteCheckedException If failed.
*/
- protected V get(
+ protected V get0(
final K key,
String taskName,
boolean deserializeBinary,
boolean needVer) throws IgniteCheckedException {
+ checkJta();
+
try {
return getAsync(key,
!ctx.config().isReadFromBackup(),
@@ -4868,7 +4693,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Map of cached values.
* @throws IgniteCheckedException If read failed.
*/
- public Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary,
+ protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary,
boolean needVer) throws IgniteCheckedException {
checkJta();
@@ -4923,7 +4748,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param key Cache key.
* @throws IllegalArgumentException If validation fails.
*/
- protected void validateCacheKey(Object key) {
+ protected final void validateCacheKey(Object key) {
if (keyCheck) {
CU.validateCacheKey(key);
@@ -4938,7 +4763,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param keys Cache keys.
* @throws IgniteException If validation fails.
*/
- protected void validateCacheKeys(Iterable<?> keys) {
+ protected final void validateCacheKeys(Iterable<?> keys) {
if (keys == null)
return;
@@ -4959,7 +4784,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializeBinary Deserialize binary flag.
* @return Public API iterator.
*/
- protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it,
+ protected final Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it,
final boolean deserializeBinary) {
return new Iterator<Cache.Entry<K, V>>() {
{
@@ -5277,7 +5102,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param plc Explicitly specified expiry policy for cache operation.
* @return Expiry policy wrapper.
*/
- @Nullable public IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) {
+ @Nullable public final IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) {
if (plc == null)
plc = ctx.expiry();
@@ -5402,7 +5227,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param opCtx Operation context.
* @return Operation future.
*/
- protected IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
+ private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
final AffinityTopologyVersion topVer,
final IgniteTxLocalAdapter tx,
final CacheOperationContext opCtx) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c530d47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 90898f9..f03a3b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,14 +334,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteCache<K, V> withAsync() {
- if (delegate instanceof GridCacheAdapter)
- ((GridCacheAdapter)delegate).toggleAsync();
-
- return super.withAsync();
- }
-
- /** {@inheritDoc} */
@Override public IgniteCache<K, V> withSkipStore() {
return skipStore();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c530d47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b291bd2..07b9dad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
- private GridDeferredAckMessageSender deferredUpdateMessageSender;
+ private GridDeferredAckMessageSender deferredUpdateMsgSnd;
/** */
private GridNearAtomicCache<K, V> near;
@@ -174,6 +174,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override protected void checkJta() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isDhtAtomic() {
return true;
}
@@ -235,7 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
- deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+ deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
@Override public int getTimeout() {
return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
}
@@ -447,7 +452,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void stop() {
- deferredUpdateMessageSender.stop();
+ deferredUpdateMsgSnd.stop();
}
/**
@@ -463,7 +468,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer)
+ @Override protected V get0(K key, String taskName, boolean deserializeBinary, boolean needVer)
throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -540,6 +545,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
+ throws IgniteCheckedException {
+ return getAllAsyncInternal(keys,
+ !ctx.config().isReadFromBackup(),
+ true,
+ null,
+ ctx.kernalContext().job().currentTaskName(),
+ deserializeBinary,
+ false,
+ true,
+ needVer,
+ false).get();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@Nullable final Collection<? extends K> keys,
final boolean forcePrimary,
@@ -551,6 +571,43 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean canRemap,
final boolean needVer
) {
+ return getAllAsyncInternal(keys,
+ forcePrimary,
+ skipTx,
+ subjId,
+ taskName,
+ deserializeBinary,
+ skipVals,
+ canRemap,
+ needVer,
+ true);
+ }
+
+ /**
+ * @param keys Keys.
+ * @param forcePrimary Force primary flag.
+ * @param skipTx Skip tx flag.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param deserializeBinary Deserialize binary flag.
+ * @param skipVals Skip values flag.
+ * @param canRemap Can remap flag.
+ * @param needVer Need version flag.
+ * @param asyncOp Async operation flag.
+ * @return Future.
+ */
+ private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal(
+ @Nullable final Collection<? extends K> keys,
+ final boolean forcePrimary,
+ boolean skipTx,
+ @Nullable UUID subjId,
+ final String taskName,
+ final boolean deserializeBinary,
+ final boolean skipVals,
+ final boolean canRemap,
+ final boolean needVer,
+ boolean asyncOp
+ ) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (F.isEmpty(keys))
@@ -561,7 +618,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheOperationContext opCtx = ctx.operationContextPerCall();
- subjId = ctx.subjectIdPerCall(null, opCtx);
+ subjId = ctx.subjectIdPerCall(subjId, opCtx);
final UUID subjId0 = subjId;
@@ -569,57 +626,91 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean skipStore = opCtx != null && opCtx.skipStore();
- return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
- @Override public IgniteInternalFuture<Map<K, V>> apply() {
- return getAllAsync0(ctx.cacheKeysView(keys),
- forcePrimary,
- subjId0,
- taskName,
- deserializeBinary,
- expiryPlc,
- skipVals,
- skipStore,
- canRemap,
- needVer);
- }
- });
+ if (asyncOp) {
+ return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
+ @Override public IgniteInternalFuture<Map<K, V>> apply() {
+ return getAllAsync0(ctx.cacheKeysView(keys),
+ forcePrimary,
+ subjId0,
+ taskName,
+ deserializeBinary,
+ expiryPlc,
+ skipVals,
+ skipStore,
+ canRemap,
+ needVer);
+ }
+ });
+ }
+ else {
+ return getAllAsync0(ctx.cacheKeysView(keys),
+ forcePrimary,
+ subjId0,
+ taskName,
+ deserializeBinary,
+ expiryPlc,
+ skipVals,
+ skipStore,
+ canRemap,
+ needVer);
+ }
}
/** {@inheritDoc} */
- @Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
- return getAndPutAsync0(key, val, filter).get();
+ @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
+ return (V)update0(
+ key,
+ val,
+ null,
+ null,
+ true,
+ filter,
+ true,
+ false).get();
}
/** {@inheritDoc} */
- @Override public boolean put(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
- return putAsync(key, val, filter).get();
+ @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
+ Boolean res = (Boolean)update0(
+ key,
+ val,
+ null,
+ null,
+ false,
+ filter,
+ true,
+ false).get();
+
+ assert res != null;
+
+ return res;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
- return updateAsync0(
+ return update0(
key,
val,
null,
null,
true,
filter,
+ true,
true);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- return updateAsync0(
+ return update0(
key,
val,
null,
null,
false,
filter,
+ true,
true);
}
@@ -627,84 +718,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException {
A.notNull(key, "key", val, "val");
- return (V)updateAsync0(
+ return (V) update0(
key,
val,
null,
null,
true,
null,
+ false,
false).get();
}
/** {@inheritDoc} */
- @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException {
- return getAndPutIfAbsentAsync(key, val).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(K key, V val) {
- A.notNull(key, "key", val, "val");
-
- return getAndPutAsync(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException {
- return putIfAbsentAsync(key, val).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(K key, V val) {
- A.notNull(key, "key", val, "val");
-
- return putAsync(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
- return getAndReplaceAsync(key, val).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndReplaceAsync(K key, V val) {
- A.notNull(key, "key", val, "val");
-
- return getAndPutAsync(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V val) throws IgniteCheckedException {
- return replaceAsync(key, val).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V val) {
- A.notNull(key, "key", val, "val");
-
- return putAsync(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException {
- return replaceAsync(key, oldVal, newVal).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
- A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
-
- return putAsync(key, newVal, ctx.equalsVal(oldVal));
- }
-
- /** {@inheritDoc} */
- @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
- putAllAsync(m).get();
+ @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
+ updateAll0(m,
+ null,
+ null,
+ null,
+ null,
+ false,
+ false,
+ true,
+ UPDATE,
+ false).get();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m) {
- return updateAllAsync0(m,
+ @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) {
+ return updateAll0(m,
null,
null,
null,
@@ -712,7 +753,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
true,
- UPDATE).chain(RET2NULL);
+ UPDATE,
+ true).chain(RET2NULL);
}
/** {@inheritDoc} */
@@ -725,7 +767,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
- return updateAllAsync0(null,
+ return updateAll0(null,
null,
null,
conflictMap,
@@ -733,57 +775,40 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
true,
- UPDATE);
+ UPDATE,
+ true);
}
/** {@inheritDoc} */
- @Override public V getAndRemove(K key) throws IgniteCheckedException {
- return getAndRemoveAsync(key).get();
+ @Override public V getAndRemove0(K key) throws IgniteCheckedException {
+ return (V)remove0(key, true, null, false).get();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<V> getAndRemoveAsync(K key) {
- A.notNull(key, "key");
-
- return removeAsync0(key, true, null);
+ @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) {
+ return remove0(key, true, null, true);
}
/** {@inheritDoc} */
- @Override public void removeAll(Collection<? extends K> keys) throws IgniteCheckedException {
- removeAllAsync(keys).get();
+ @Override protected void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException {
+ removeAllAsync0(keys, null, false, false, false).get();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys) {
- A.notNull(keys, "keys");
-
- return removeAllAsync0(keys, null, false, false).chain(RET2NULL);
+ @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) {
+ return removeAllAsync0(keys, null, false, false, true).chain(RET2NULL);
}
/** {@inheritDoc} */
- @Override public boolean remove(K key) throws IgniteCheckedException {
- return removeAsync(key, (CacheEntryPredicate)null).get();
+ @Override protected boolean remove0(K key, CacheEntryPredicate filter) throws IgniteCheckedException {
+ return (Boolean)remove0(key, false, filter, false).get();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<Boolean> removeAsync(K key, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key");
-
- return removeAsync0(key, false, filter);
- }
-
- /** {@inheritDoc} */
- @Override public boolean remove(K key, V val) throws IgniteCheckedException {
- return removeAsync(key, val).get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) {
- A.notNull(key, "key", val, "val");
-
- return removeAsync(key, ctx.equalsVal(val));
+ @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) {
+ return remove0(key, false, filter, true);
}
/** {@inheritDoc} */
@@ -796,7 +821,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
- return removeAllAsync0(null, conflictMap, false, false);
+ return removeAllAsync0(null, conflictMap, false, false, true);
}
/**
@@ -811,10 +836,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Future.
*/
@SuppressWarnings("unchecked")
- protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
- if (!asyncToggled)
- return op.apply();
-
+ private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
IgniteInternalFuture<T> fail = asyncOpAcquire();
if (fail != null)
@@ -871,7 +893,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
throws IgniteCheckedException {
- IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invokeAsync(key, entryProcessor, args);
+ IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invoke0(false, key, entryProcessor, args);
EntryProcessorResult<T> res = invokeFut.get();
@@ -881,16 +903,30 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
- Object... args)
- throws IgniteCheckedException {
- return invokeAllAsync(keys, entryProcessor, args).get();
+ Object... args) throws IgniteCheckedException
+ {
+ return invokeAll0(false, keys, entryProcessor, args).get();
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
+ return invoke0(true, key, entryProcessor, args);
+ }
+
+ /**
+ * @param async Async operation flag.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Entry processor arguments.
+ * @return Future.
+ */
+ private <T> IgniteInternalFuture<EntryProcessorResult<T>> invoke0(
+ boolean async,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
A.notNull(key, "key", entryProcessor, "entryProcessor");
if (keyCheck)
@@ -900,14 +936,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAsync0(
+ IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = update0(
key,
null,
entryProcessor,
args,
false,
null,
- true);
+ true,
+ async);
return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
@@ -940,6 +977,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
Object... args) {
+ return invokeAll0(true, keys, entryProcessor, args);
+ }
+
+ /**
+ * @param async Async operation flag.
+ * @param keys Keys.
+ * @param entryProcessor Entry processor.
+ * @param args Entry processor arguments.
+ * @return Future.
+ */
+ private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0(
+ boolean async,
+ Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
A.notNull(keys, "keys", entryProcessor, "entryProcessor");
if (keyCheck)
@@ -955,7 +1007,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAllAsync0(null,
+ IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAll0(null,
invokeMap,
args,
null,
@@ -963,7 +1015,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
true,
- TRANSFORM);
+ TRANSFORM,
+ async);
return resFut.chain(
new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
@@ -981,7 +1034,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) throws IgniteCheckedException {
- return invokeAllAsync(map, args).get();
+ A.notNull(map, "map");
+
+ if (keyCheck)
+ validateCacheKeys(map.keySet());
+
+ return (Map<K, EntryProcessorResult<T>>)updateAll0(null,
+ map,
+ args,
+ null,
+ null,
+ false,
+ false,
+ true,
+ TRANSFORM,
+ false).get();
}
/** {@inheritDoc} */
@@ -994,7 +1061,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (keyCheck)
validateCacheKeys(map.keySet());
- return updateAllAsync0(null,
+ return updateAll0(null,
map,
args,
null,
@@ -1002,7 +1069,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
true,
- TRANSFORM);
+ TRANSFORM,
+ true);
}
/**
@@ -1016,10 +1084,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param waitTopFut Whether to wait for topology future.
+ * @param async Async operation flag.
* @return Completion future.
*/
@SuppressWarnings("ConstantConditions")
- private IgniteInternalFuture updateAllAsync0(
+ private IgniteInternalFuture updateAll0(
@Nullable Map<? extends K, ? extends V> map,
@Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
@Nullable Object[] invokeArgs,
@@ -1028,7 +1097,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean retval,
final boolean rawRetval,
final boolean waitTopFut,
- final GridCacheOperation op
+ final GridCacheOperation op,
+ boolean async
) {
assert ctx.updatesAllowed();
@@ -1105,13 +1175,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
waitTopFut);
- return asyncOp(new CO<IgniteInternalFuture<Object>>() {
- @Override public IgniteInternalFuture<Object> apply() {
- updateFut.map();
+ if (async) {
+ return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+ @Override public IgniteInternalFuture<Object> apply() {
+ updateFut.map();
- return updateFut;
- }
- });
+ return updateFut;
+ }
+ });
+ }
+ else {
+ updateFut.map();
+
+ return updateFut;
+ }
}
/**
@@ -1124,16 +1201,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param retval Return value flag.
* @param filter Filter.
* @param waitTopFut Whether to wait for topology future.
+ * @param async Async operation flag.
* @return Future.
*/
- private IgniteInternalFuture updateAsync0(
+ private IgniteInternalFuture update0(
K key,
@Nullable V val,
@Nullable EntryProcessor proc,
@Nullable Object[] invokeArgs,
final boolean retval,
@Nullable final CacheEntryPredicate filter,
- final boolean waitTopFut
+ final boolean waitTopFut,
+ boolean async
) {
assert val == null || proc == null;
@@ -1146,13 +1225,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicAbstractUpdateFuture updateFut =
createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut);
- return asyncOp(new CO<IgniteInternalFuture<Object>>() {
- @Override public IgniteInternalFuture<Object> apply() {
- updateFut.map();
+ if (async) {
+ return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+ @Override public IgniteInternalFuture<Object> apply() {
+ updateFut.map();
- return updateFut;
- }
- });
+ return updateFut;
+ }
+ });
+ }
+ else {
+ updateFut.map();
+
+ return updateFut;
+ }
}
/**
@@ -1161,33 +1247,38 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param key Key.
* @param retval Whether to return
* @param filter Filter.
+ * @param async Async operation flag.
* @return Future.
*/
- private IgniteInternalFuture removeAsync0(K key, final boolean retval,
- @Nullable CacheEntryPredicate filter) {
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
+ private IgniteInternalFuture remove0(K key, final boolean retval,
+ @Nullable CacheEntryPredicate filter,
+ boolean async) {
assert ctx.updatesAllowed();
- validateCacheKey(key);
-
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
- final GridNearAtomicAbstractUpdateFuture updateFut =
- createSingleUpdateFuture(key, null, null, null, retval, filter, true);
+ final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key,
+ null,
+ null,
+ null,
+ retval,
+ filter,
+ true);
- if (statsEnabled)
- updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
+ if (async) {
+ return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+ @Override public IgniteInternalFuture<Object> apply() {
+ updateFut.map();
- return asyncOp(new CO<IgniteInternalFuture<Object>>() {
- @Override public IgniteInternalFuture<Object> apply() {
- updateFut.map();
+ return updateFut;
+ }
+ });
+ }
+ else {
+ updateFut.map();
- return updateFut;
- }
- });
+ return updateFut;
+ }
}
/**
@@ -1326,14 +1417,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable Collection<? extends K> keys,
@Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap,
final boolean retval,
- boolean rawRetval
+ boolean rawRetval,
+ boolean async
) {
assert ctx.updatesAllowed();
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
assert keys != null || conflictMap != null;
if (keyCheck)
@@ -1380,16 +1468,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
true);
- if (statsEnabled)
- updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
+ if (async) {
+ return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+ @Override public IgniteInternalFuture<Object> apply() {
+ updateFut.map();
- return asyncOp(new CO<IgniteInternalFuture<Object>>() {
- @Override public IgniteInternalFuture<Object> apply() {
- updateFut.map();
+ return updateFut;
+ }
+ });
+ }
+ else {
+ updateFut.map();
- return updateFut;
- }
- });
+ return updateFut;
+ }
}
/**
@@ -3248,7 +3340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param ver Version to ack.
*/
private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
- deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver);
+ deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c530d47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 176a90f..9cf400d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -327,19 +327,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
needVer);
}
- /** {@inheritDoc} */
- @Override protected GridCacheEntryEx entryExSafe(
- KeyCacheObject key,
- AffinityTopologyVersion topVer
- ) {
- try {
- return ctx.affinity().localNode(key, topVer) ? entryEx(key) : null;
- }
- catch (GridDhtInvalidPartitionException ignored) {
- return null;
- }
- }
-
/**
* @param keys Keys to load.
* @param readThrough Read through flag.