You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/08 13:17:04 UTC
[19/28] ignite git commit:
https://issues.apache.org/jira/browse/IGNITE-4393
https://issues.apache.org/jira/browse/IGNITE-4393
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07573183
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07573183
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07573183
Branch: refs/heads/ignite-4371
Commit: 075731835368a0c4a4e36e796105553c38ce41af
Parents: d8ce5af
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Dec 8 12:01:18 2016 +0700
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Dec 8 12:01:18 2016 +0700
----------------------------------------------------------------------
.../java/org/apache/ignite/BenchAtomic.java | 24 ++++++++++----------
.../internal/GridPerformanceSuggestions.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 24 +++++++++++++++++---
.../processors/cache/IgniteCacheProxy.java | 8 +++++++
.../dht/atomic/GridDhtAtomicCache.java | 5 ++--
.../local/atomic/GridLocalAtomicCache.java | 3 +++
6 files changed, 48 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
index 4f99123..fdaf56c 100644
--- a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
+++ b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
@@ -132,7 +132,7 @@ public class BenchAtomic {
final IgniteCache<Integer, byte[]> cache0 = ignite.getOrCreateCache(
BenchAtomic.<Integer, byte[]>cacheConfig(writeSync));
- final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync();
+// final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync();
final Semaphore sem = new Semaphore(2048);
@@ -176,17 +176,17 @@ public class BenchAtomic {
int key = ThreadLocalRandom.current().nextInt(KEYS);
- if (async) {
- sem.acquireUninterruptibly();
-
- asyncCache.put(key, val);
-
- IgniteFuture<Object> f = asyncCache.future();
-
- f.listen(lsnr);
-
- continue;
- }
+// if (async) {
+// sem.acquireUninterruptibly();
+//
+// asyncCache.put(key, val);
+//
+// IgniteFuture<Object> f = asyncCache.future();
+//
+// f.listen(lsnr);
+//
+// continue;
+// }
boolean startTx = cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
CacheAtomicityMode.TRANSACTIONAL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
index b040a97..5e8e520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
@@ -89,4 +89,4 @@ public class GridPerformanceSuggestions {
@Override public String toString() {
return S.toString(GridPerformanceSuggestions.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index eb0a8d9..a8d9f1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
+ /** */
+ protected volatile boolean asyncToggled;
+
/** {@inheritDoc} */
@Override public String name() {
return cacheCfg.getName();
@@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Toggles async flag if someone calls {@code withAsync()}
+ * on proxy and since that we have to properly handle all cache
+ * operations (sync and async) to put them in proper sequence.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
+ */
+ void toggleAsync() {
+ if (!asyncToggled)
+ asyncToggled = true;
+ }
+
+ /**
* Prints memory stats.
*/
public void printMemoryStats() {
@@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Put future.
*/
public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+ A.notNull(key, "key", val, "val");
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
@Nullable final CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
if (keyCheck)
validateCacheKey(key);
@@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Failed future if waiting was interrupted.
*/
@Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+ if (!asyncToggled)
+ return null;
+
try {
if (asyncOpsSem != null)
asyncOpsSem.acquire();
@@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* Releases asynchronous operations permit, if limited.
*/
protected void asyncOpRelease() {
- if (asyncOpsSem != null)
+ if (asyncOpsSem != null && asyncToggled)
asyncOpsSem.release();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9e6e82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withAsync() {
+ if (delegate instanceof GridCacheAdapter)
+ ((GridCacheAdapter)delegate).toggleAsync();
+
+ return super.withAsync();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withSkipStore() {
return skipStore();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 940c74e..0e60ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
return updateAsync0(
key,
val,
@@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
+ if (!asyncToggled)
+ return op.apply();
+
IgniteInternalFuture<T> fail = asyncOpAcquire();
if (fail != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a419887..bc16ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
*/
@SuppressWarnings("unchecked")
protected IgniteInternalFuture asyncOp(final Callable<?> op) {
+ if (!asyncToggled)
+ return ctx.closures().callLocalSafe(op);
+
IgniteInternalFuture fail = asyncOpAcquire();
if (fail != null)