You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/01/31 15:12:48 UTC
[05/11] git commit: 6503 followup,
make sure we send CompleteMessage when streaming is done.
6503 followup, make sure we send CompleteMessage when streaming is done.
Patch by yukim, reviewed by marcuse for CASSANDRA-6503
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1141cdb0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1141cdb0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1141cdb0
Branch: refs/heads/trunk
Commit: 1141cdb0c8122d3e990f29dc82576bededcbfd40
Parents: a8a12d9
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Jan 31 11:55:02 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Jan 31 11:55:02 2014 +0100
----------------------------------------------------------------------
.../org/apache/cassandra/streaming/StreamSession.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1141cdb0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 4777995..7972183 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -144,6 +144,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
}
private volatile State state = State.INITIALIZED;
+ private volatile boolean completeSent = false;
/**
* Create new streaming session with the peer.
@@ -505,11 +506,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
{
if (state == State.WAIT_COMPLETE)
{
+ if (!completeSent)
+ {
+ handler.sendMessage(new CompleteMessage());
+ completeSent = true;
+ }
closeSession(State.COMPLETE);
}
else
{
- handler.sendMessage(new CompleteMessage());
state(State.WAIT_COMPLETE);
}
}
@@ -594,12 +599,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
{
if (state == State.WAIT_COMPLETE)
{
+ if (!completeSent)
+ {
+ handler.sendMessage(new CompleteMessage());
+ completeSent = true;
+ }
closeSession(State.COMPLETE);
}
else
{
// notify peer that this session is completed
handler.sendMessage(new CompleteMessage());
+ completeSent = true;
state(State.WAIT_COMPLETE);
}
}