You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/07 15:37:37 UTC

ignite git commit: ignite-comm-balance-master

Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master 18d0d0d94 -> d8ce5afc5


ignite-comm-balance-master


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8ce5afc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8ce5afc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8ce5afc

Branch: refs/heads/ignite-comm-balance-master
Commit: d8ce5afc5d71225131da2a1a3c7ed4b1d22c9549
Parents: 18d0d0d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 7 18:37:30 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 7 18:37:30 2016 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamProcessor.java       | 22 +++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8ce5afc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             if (!allowOverwrite)
                 cctx.topology().readLock();
 
+            GridDhtTopologyFuture topWaitFut = null;
+
             try {
                 GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
                     waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
                 }
-                else {
-                    fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
-                            localUpdate(nodeId, req, updater, topic);
-                        }
-                    });
-                }
+                else
+                    topWaitFut = fut;
             }
             finally {
                 if (!allowOverwrite)
                     cctx.topology().readUnlock();
             }
 
+            if (topWaitFut != null) {
+                // Need call 'listen' after topology read lock is released.
+                topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                        localUpdate(nodeId, req, updater, topic);
+                    }
+                });
+
+                return;
+            }
+
             if (job != null) {
                 try {
                     job.call();