You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/10 16:46:34 UTC
ignite git commit: completing from user thread
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance-master 312706eaa -> 69eac481d
completing from user thread
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69eac481
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69eac481
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69eac481
Branch: refs/heads/ignite-comm-balance-master
Commit: 69eac481d2002f5929fa07adc8c9da0178f2f4ea
Parents: 312706e
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Jan 10 19:46:20 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Jan 10 19:46:20 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 15 ++++++
.../GridNearAtomicAbstractUpdateFuture.java | 20 ++++++++
.../internal/util/future/GridFutureAdapter.java | 48 ++++++++++++++++++++
3 files changed, 83 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 44221b1..a1992b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -802,6 +804,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
+ if (msg.message() instanceof GridNearAtomicUpdateResponse) {
+ GridNearAtomicUpdateResponse res = (GridNearAtomicUpdateResponse)msg.message();
+
+ GridNearAtomicAbstractUpdateFuture f =
+ (GridNearAtomicAbstractUpdateFuture)ctx.cache().context().mvcc().atomicFuture(res.futureVersion());
+
+ f.completer(c);
+
+ f.unblockAllThreads();
+
+ return;
+ }
+
if (ctx.config().getStripedPoolSize() > 0 &&
plc == GridIoPolicy.SYSTEM_POOL &&
msg.partition() != Integer.MIN_VALUE
http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 2fbabaa..fb4ca78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -134,6 +134,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Operation result. */
protected GridCacheReturn opRes;
+ /** */
+ protected volatile Runnable completer;
+
/**
* Constructor.
*
@@ -217,6 +220,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
+ * @param completer
+ */
+ public void completer(Runnable completer) {
+ this.completer = completer;
+ }
+
+ /** {@inheritDoc} */
+ protected Runnable clearCompleter() {
+ Runnable r = completer;
+
+ if (r != null)
+ completer = null;
+
+ return r;
+ }
+
+ /**
* @param topVer Topology version.
*/
protected abstract void map(AffinityTopologyVersion topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/69eac481/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 39b044d..dc2afd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -167,6 +167,20 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
/**
+ * @return Completer.
+ */
+ protected Runnable completer() {
+ return null;
+ }
+
+ /**
+ * @return Completer and clears it.
+ */
+ protected Runnable clearCompleter() {
+ return null;
+ }
+
+ /**
* Internal get routine.
*
* @param ignoreInterrupts Whether to ignore interrupts.
@@ -187,6 +201,14 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
for (; ; ) {
LockSupport.park();
+ Runnable r = clearCompleter();
+
+ if (r != null) {
+ r.run();
+
+ assert isDone();
+ }
+
if (isDone())
return resolve(state);
@@ -360,6 +382,32 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
}
+ public void unblockAllThreads() {
+ unblockFirstThread0(state);
+ }
+
+ /**
+ * @param waiter Head of waiters queue to unblock.
+ */
+ private void unblockFirstThread0(Object waiter) {
+ while (waiter != null) {
+ if (waiter instanceof Thread) {
+ LockSupport.unpark((Thread)waiter);
+
+ return;
+ }
+ else if (waiter.getClass() == WaitNode.class) {
+ WaitNode waitNode = (WaitNode) waiter;
+
+ unblockFirstThread0(waitNode.waiter);
+
+ waiter = waitNode.next;
+ }
+ else
+ return;
+ }
+ }
+
/** {@inheritDoc} */
@Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
Object res = registerWaiter(newLsnr);