You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/08 13:17:04 UTC

[19/28] ignite git commit: https://issues.apache.org/jira/browse/IGNITE-4393

https://issues.apache.org/jira/browse/IGNITE-4393


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07573183
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07573183
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07573183

Branch: refs/heads/ignite-4371
Commit: 075731835368a0c4a4e36e796105553c38ce41af
Parents: d8ce5af
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Dec 8 12:01:18 2016 +0700
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Dec 8 12:01:18 2016 +0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/BenchAtomic.java     | 24 ++++++++++----------
 .../internal/GridPerformanceSuggestions.java    |  2 +-
 .../processors/cache/GridCacheAdapter.java      | 24 +++++++++++++++++---
 .../processors/cache/IgniteCacheProxy.java      |  8 +++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  5 ++--
 .../local/atomic/GridLocalAtomicCache.java      |  3 +++
 6 files changed, 48 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
index 4f99123..fdaf56c 100644
--- a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
+++ b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
@@ -132,7 +132,7 @@ public class BenchAtomic {
         final IgniteCache<Integer, byte[]> cache0 = ignite.getOrCreateCache(
             BenchAtomic.<Integer, byte[]>cacheConfig(writeSync));
 
-        final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync();
+//        final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync();
 
         final Semaphore sem = new Semaphore(2048);
 
@@ -176,17 +176,17 @@ public class BenchAtomic {
 
                             int key = ThreadLocalRandom.current().nextInt(KEYS);
 
-                            if (async) {
-                                sem.acquireUninterruptibly();
-
-                                asyncCache.put(key, val);
-
-                                IgniteFuture<Object> f = asyncCache.future();
-
-                                f.listen(lsnr);
-
-                                continue;
-                            }
+//                            if (async) {
+//                                sem.acquireUninterruptibly();
+//
+//                                asyncCache.put(key, val);
+//
+//                                IgniteFuture<Object> f = asyncCache.future();
+//
+//                                f.listen(lsnr);
+//
+//                                continue;
+//                            }
 
                             boolean startTx = cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() ==
                                 CacheAtomicityMode.TRANSACTIONAL;

http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
index b040a97..5e8e520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
@@ -89,4 +89,4 @@ public class GridPerformanceSuggestions {
     @Override public String toString() {
         return S.toString(GridPerformanceSuggestions.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index eb0a8d9..a8d9f1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** Asynchronous operations limit semaphore. */
     private Semaphore asyncOpsSem;
 
+    /** */
+    protected volatile boolean asyncToggled;
+
     /** {@inheritDoc} */
     @Override public String name() {
         return cacheCfg.getName();
@@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * Toggles async flag if someone calls {@code withAsync()}
+     * on proxy and since that we have to properly handle all cache
+     * operations (sync and async) to put them in proper sequence.
+     *
+     * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
+     */
+    void toggleAsync() {
+        if (!asyncToggled)
+            asyncToggled = true;
+    }
+
+    /**
      * Prints memory stats.
      */
     public void printMemoryStats() {
@@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Put future.
      */
     public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+        A.notNull(key, "key", val, "val");
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
         @Nullable final CacheEntryPredicate filter) {
-        A.notNull(key, "key", val, "val");
-
         if (keyCheck)
             validateCacheKey(key);
 
@@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Failed future if waiting was interrupted.
      */
     @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+        if (!asyncToggled)
+            return null;
+
         try {
             if (asyncOpsSem != null)
                 asyncOpsSem.acquire();
@@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * Releases asynchronous operations permit, if limited.
      */
     protected void asyncOpRelease() {
-        if (asyncOpsSem != null)
+        if (asyncOpsSem != null && asyncToggled)
             asyncOpsSem.release();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9e6e82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withAsync() {
+        if (delegate instanceof GridCacheAdapter)
+            ((GridCacheAdapter)delegate).toggleAsync();
+
+        return super.withAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withSkipStore() {
         return skipStore();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 940c74e..0e60ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
-        A.notNull(key, "key", val, "val");
-
         return updateAsync0(
             key,
             val,
@@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
+        if (!asyncToggled)
+            return op.apply();
+
         IgniteInternalFuture<T> fail = asyncOpAcquire();
 
         if (fail != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a419887..bc16ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
      */
     @SuppressWarnings("unchecked")
     protected IgniteInternalFuture asyncOp(final Callable<?> op) {
+        if (!asyncToggled)
+            return ctx.closures().callLocalSafe(op);
+
         IgniteInternalFuture fail = asyncOpAcquire();
 
         if (fail != null)