You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/07 15:37:37 UTC
ignite git commit: ignite-comm-balance-master
Repository: ignite
Updated Branches:
refs/heads/ignite-comm-balance-master 18d0d0d94 -> d8ce5afc5
ignite-comm-balance-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8ce5afc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8ce5afc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8ce5afc
Branch: refs/heads/ignite-comm-balance-master
Commit: d8ce5afc5d71225131da2a1a3c7ed4b1d22c9549
Parents: 18d0d0d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 7 18:37:30 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 7 18:37:30 2016 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamProcessor.java | 22 +++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d8ce5afc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (!allowOverwrite)
cctx.topology().readLock();
+ GridDhtTopologyFuture topWaitFut = null;
+
try {
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
}
- else {
- fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
- localUpdate(nodeId, req, updater, topic);
- }
- });
- }
+ else
+ topWaitFut = fut;
}
finally {
if (!allowOverwrite)
cctx.topology().readUnlock();
}
+ if (topWaitFut != null) {
+ // Need call 'listen' after topology read lock is released.
+ topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(nodeId, req, updater, topic);
+ }
+ });
+
+ return;
+ }
+
if (job != null) {
try {
job.call();