You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/26 11:16:31 UTC
[15/50] [abbrv] ignite git commit: ignite-2412 Do not call 'asyncOp'
for synchronous operations
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index a8219b0..4350b3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -450,61 +450,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException {
- return dht.getAndPutIfAbsent(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(K key, V val) {
- return dht.getAndPutIfAbsentAsync(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException {
- return dht.putIfAbsent(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(K key, V val) {
- return dht.putIfAbsentAsync(key, val);
- }
-
- /** {@inheritDoc} */
@Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException {
return dht.tryGetAndPut(key, val);
}
/** {@inheritDoc} */
- @Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
- return dht.getAndReplace(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndReplaceAsync(K key, V val) {
- return dht.getAndReplaceAsync(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V val) throws IgniteCheckedException {
- return dht.replace(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V val) {
- return dht.replaceAsync(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException {
- return dht.replace(key, oldVal, newVal);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
- return dht.replaceAsync(key, oldVal, newVal);
- }
-
- /** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> m)
throws IgniteCheckedException {
dht.putAll(m);
@@ -569,6 +519,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public boolean remove(K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
+ return dht.remove(key, filter);
+ }
+
+ /** {@inheritDoc} */
@Override public V getAndRemove(K key) throws IgniteCheckedException {
return dht.getAndRemove(key);
}
@@ -602,16 +557,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public boolean remove(K key, V val) throws IgniteCheckedException {
- return dht.remove(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) {
- return dht.removeAsync(key, val);
- }
-
- /** {@inheritDoc} */
@Override public void removeAll() throws IgniteCheckedException {
dht.removeAll();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index bc16ff4..a26d2f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.resource.GridResourceIoc;
-import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -108,6 +107,11 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
+ @Override protected void checkJta() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isLocal() {
return true;
}
@@ -119,9 +123,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
+ @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
return (V)updateAllInternal(UPDATE,
@@ -138,16 +140,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
- @Override public boolean put(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
- boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- long start = statsEnabled ? System.nanoTime() : 0L;
-
+ @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
- boolean res = (Boolean)updateAllInternal(UPDATE,
+ Boolean res = (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
null,
@@ -159,8 +155,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
- if (statsEnabled)
- metrics0().addPutTimeNanos(System.nanoTime() - start);
+ assert res != null;
return res;
}
@@ -168,8 +163,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
return updateAllAsync0(F0.asMap(key, val),
null,
null,
@@ -181,8 +174,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
return updateAllAsync0(F0.asMap(key, val),
null,
null,
@@ -192,65 +183,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException {
- return getAndPut(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndPutIfAbsentAsync(K key, V val) {
- return getAndPutAsync(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException {
- return put(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> putIfAbsentAsync(K key, V val) {
- return putAsync(key, val, ctx.noVal());
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
- return getAndPut(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> getAndReplaceAsync(K key, V val) {
- return getAndPutAsync(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V val) throws IgniteCheckedException {
- return put(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V val) {
- return putAsync(key, val, ctx.hasVal());
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException {
- A.notNull(oldVal, "oldVal");
-
- return put(key, newVal, ctx.equalsVal(oldVal));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
- return putAsync(key, newVal, ctx.equalsVal(oldVal));
- }
-
- /** {@inheritDoc} */
- @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
- boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- long start = statsEnabled ? System.nanoTime() : 0L;
-
+ @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
updateAllInternal(UPDATE,
@@ -264,13 +197,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
-
- if (statsEnabled)
- metrics0().addPutTimeNanos(System.nanoTime() - start);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m) {
+ @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) {
return updateAllAsync0(m,
null,
null,
@@ -280,8 +210,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public V getAndRemove(K key) throws IgniteCheckedException {
+ @Override protected V getAndRemove0(K key) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
return (V)updateAllInternal(DELETE,
@@ -299,13 +228,13 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<V> getAndRemoveAsync(K key) {
+ @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) {
return removeAllAsync0(Collections.singletonList(key), true, false, null);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void removeAll(Collection<? extends K> keys) throws IgniteCheckedException {
+ @Override public void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
updateAllInternal(DELETE,
@@ -322,19 +251,13 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys) {
+ @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) {
return removeAllAsync0(keys, false, false, null).chain(RET2NULL);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public boolean remove(K key) throws IgniteCheckedException {
- boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- long start = statsEnabled ? System.nanoTime() : 0L;
-
- A.notNull(key, "key");
-
+ @Override public boolean remove0(K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
Boolean rmv = (Boolean)updateAllInternal(DELETE,
@@ -344,50 +267,23 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
expiryPerCall(),
false,
false,
- null,
+ filter,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
- if (statsEnabled && rmv)
- metrics0().addRemoveTimeNanos(System.nanoTime() - start);
+ assert rmv != null;
return rmv;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<Boolean> removeAsync(K key, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key");
-
+ @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) {
return removeAllAsync0(Collections.singletonList(key), false, false, filter);
}
/** {@inheritDoc} */
- @Override public boolean remove(K key, V val) throws IgniteCheckedException {
- A.notNull(key, "key", val, "val");
-
- CacheOperationContext opCtx = ctx.operationContextPerCall();
-
- return (Boolean)updateAllInternal(DELETE,
- Collections.singleton(key),
- null,
- null,
- expiryPerCall(),
- false,
- false,
- ctx.equalsVal(val),
- ctx.writeThrough(),
- ctx.readThrough(),
- opCtx != null && opCtx.isKeepBinary());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) {
- return removeAsync(key, ctx.equalsVal(val));
- }
-
- /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
return ctx.closures().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -399,11 +295,13 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
/** {@inheritDoc} */
-
@SuppressWarnings("unchecked")
- @Override @Nullable public V get(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException {
- String taskName = ctx.kernalContext().job().currentTaskName();
-
+ @Override protected V get0(
+ final K key,
+ String taskName,
+ boolean deserializeBinary,
+ boolean needVer) throws IgniteCheckedException
+ {
Map<K, V> m = getAllInternal(Collections.singleton(key),
ctx.isSwapOrOffheapEnabled(),
ctx.readThrough(),
@@ -419,7 +317,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
+ @Override public final Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
throws IgniteCheckedException {
A.notNull(keys, "keys");
@@ -794,7 +692,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
+ return asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
keys,
@@ -809,11 +707,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
keepBinary);
}
});
-
- if (ctx.config().isStatisticsEnabled())
- fut.listen(new UpdatePutTimeStatClosure(metrics0(), System.nanoTime()));
-
- return fut;
}
/**
@@ -835,17 +728,13 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final boolean readThrough = ctx.readThrough();
- final boolean statsEnabled = ctx.config().isStatisticsEnabled();
-
- final long start = statsEnabled ? System.nanoTime() : 0L;
-
final ExpiryPolicy expiryPlc = expiryPerCall();
CacheOperationContext opCtx = ctx.operationContextPerCall();
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
+ return asyncOp(new Callable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(DELETE,
keys,
@@ -860,11 +749,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
keepBinary);
}
});
-
- if (statsEnabled)
- fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
-
- return fut;
}
/**
@@ -1584,10 +1468,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
* @return Future.
*/
@SuppressWarnings("unchecked")
- protected IgniteInternalFuture asyncOp(final Callable<?> op) {
- if (!asyncToggled)
- return ctx.closures().callLocalSafe(op);
-
+ private IgniteInternalFuture asyncOp(final Callable<?> op) {
IgniteInternalFuture fail = asyncOpAcquire();
if (fail != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
index 3e3b84e..648134e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
@@ -34,7 +34,7 @@ public class CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest extend
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE);
+ cfg.getTransactionConfiguration().setDefaultTxTimeout(5 * 60_000);
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
index ec3b808..57c709b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.igfs;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteFileSystem;
@@ -39,14 +41,11 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Test to check for system pool starvation due to {@link IgfsBlocksMessage}.
@@ -125,8 +124,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
@Override public Void call() throws Exception {
GridCacheAdapter dataCache = dataCache(attacker);
- try (IgniteInternalTx tx =
- dataCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ try (IgniteInternalTx tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
dataCache.put(DATA_KEY, 0);
txStartLatch.countDown();
@@ -185,6 +183,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
* Create IGFS file asynchronously.
*
* @param path Path.
+ * @param writeStartLatch Write start latch.
* @return Future.
*/
private IgniteInternalFuture<Void> createFileAsync(final IgfsPath path, final CountDownLatch writeStartLatch) {
@@ -265,6 +264,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
cfg.setLocalHost("127.0.0.1");
cfg.setConnectorConfiguration(null);
+ cfg.setStripedPoolSize(0);
cfg.setSystemThreadPoolSize(2);
cfg.setRebalanceThreadPoolSize(1);
cfg.setPublicThreadPoolSize(1);