You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:15:44 UTC

[04/50] [abbrv] ignite git commit: IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails: prevent 'localUpdate' execution while top read lock is held.

IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails: prevent 'localUpdate' execution while top read lock is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b83ec8e5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b83ec8e5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b83ec8e5

Branch: refs/heads/master
Commit: b83ec8e57c7c48f2baa4780cf3b2e46df075f3df
Parents: bc977d3
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 9 14:32:42 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 9 14:32:42 2016 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamProcessor.java       | 22 +++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b83ec8e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             if (!allowOverwrite)
                 cctx.topology().readLock();
 
+            GridDhtTopologyFuture topWaitFut = null;
+
             try {
                 GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
                     waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
                 }
-                else {
-                    fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
-                            localUpdate(nodeId, req, updater, topic);
-                        }
-                    });
-                }
+                else
+                    topWaitFut = fut;
             }
             finally {
                 if (!allowOverwrite)
                     cctx.topology().readUnlock();
             }
 
+            if (topWaitFut != null) {
+                // Need call 'listen' after topology read lock is released.
+                topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                        localUpdate(nodeId, req, updater, topic);
+                    }
+                });
+
+                return;
+            }
+
             if (job != null) {
                 try {
                     job.call();