You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:15:44 UTC
[04/50] [abbrv] ignite git commit: IGNITE-500
CacheLoadingConcurrentGridStartSelfTest fails: prevent 'localUpdate'
execution while top read lock is held.
IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails: prevent 'localUpdate' execution while top read lock is held.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b83ec8e5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b83ec8e5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b83ec8e5
Branch: refs/heads/master
Commit: b83ec8e57c7c48f2baa4780cf3b2e46df075f3df
Parents: bc977d3
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 9 14:32:42 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 9 14:32:42 2016 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamProcessor.java | 22 +++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b83ec8e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 32fda87..fee4dd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (!allowOverwrite)
cctx.topology().readLock();
+ GridDhtTopologyFuture topWaitFut = null;
+
try {
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
@@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
}
- else {
- fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
- localUpdate(nodeId, req, updater, topic);
- }
- });
- }
+ else
+ topWaitFut = fut;
}
finally {
if (!allowOverwrite)
cctx.topology().readUnlock();
}
+ if (topWaitFut != null) {
+ // Need call 'listen' after topology read lock is released.
+ topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(nodeId, req, updater, topic);
+ }
+ });
+
+ return;
+ }
+
if (job != null) {
try {
job.call();