You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/10 16:46:34 UTC

ignite git commit: completing from user thread

Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master 312706eaa -> 69eac481d


completing from user thread


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69eac481
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69eac481
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69eac481

Branch: refs/heads/ignite-comm-balance-master
Commit: 69eac481d2002f5929fa07adc8c9da0178f2f4ea
Parents: 312706e
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Jan 10 19:46:20 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Jan 10 19:46:20 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 15 ++++++
 .../GridNearAtomicAbstractUpdateFuture.java     | 20 ++++++++
 .../internal/util/future/GridFutureAdapter.java | 48 ++++++++++++++++++++
 3 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 44221b1..a1992b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -802,6 +804,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return;
         }
 
+        if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+            GridNearAtomicUpdateResponse res = (GridNearAtomicUpdateResponse)msg.message();
+
+            GridNearAtomicAbstractUpdateFuture f =
+                (GridNearAtomicAbstractUpdateFuture)ctx.cache().context().mvcc().atomicFuture(res.futureVersion());
+
+            f.completer(c);
+
+            f.unblockAllThreads();
+
+            return;
+        }
+
         if (ctx.config().getStripedPoolSize() > 0 &&
             plc == GridIoPolicy.SYSTEM_POOL &&
             msg.partition() != Integer.MIN_VALUE

http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 2fbabaa..fb4ca78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -134,6 +134,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /** Operation result. */
     protected GridCacheReturn opRes;
 
+    /** */
+    protected volatile Runnable completer;
+
     /**
      * Constructor.
      *
@@ -217,6 +220,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     }
 
     /**
+     * @param completer
+     */
+    public void completer(Runnable completer) {
+        this.completer = completer;
+    }
+
+    /** {@inheritDoc} */
+    protected Runnable clearCompleter() {
+        Runnable r = completer;
+
+        if (r != null)
+            completer = null;
+
+        return r;
+    }
+
+    /**
      * @param topVer Topology version.
      */
     protected abstract void map(AffinityTopologyVersion topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 39b044d..dc2afd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -167,6 +167,20 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
     }
 
     /**
+     * @return Completer.
+     */
+    protected Runnable completer() {
+        return null;
+    }
+
+    /**
+     * @return Completer and clears it.
+     */
+    protected Runnable clearCompleter() {
+        return null;
+    }
+
+    /**
      * Internal get routine.
      *
      * @param ignoreInterrupts Whether to ignore interrupts.
@@ -187,6 +201,14 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
             for (; ; ) {
                 LockSupport.park();
 
+                Runnable r = clearCompleter();
+
+                if (r != null) {
+                    r.run();
+
+                    assert isDone();
+                }
+
                 if (isDone())
                     return resolve(state);
 
@@ -360,6 +382,32 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         }
     }
 
+    public void unblockAllThreads() {
+        unblockFirstThread0(state);
+    }
+
+    /**
+     * @param waiter Head of waiters queue to unblock.
+     */
+    private void unblockFirstThread0(Object waiter) {
+        while (waiter != null) {
+            if (waiter instanceof Thread) {
+                LockSupport.unpark((Thread)waiter);
+
+                return;
+            }
+            else if (waiter.getClass() == WaitNode.class) {
+                WaitNode waitNode = (WaitNode) waiter;
+
+                unblockFirstThread0(waitNode.waiter);
+
+                waiter = waitNode.next;
+            }
+            else
+                return;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
         Object res = registerWaiter(newLsnr);