You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/01/31 15:12:48 UTC

[05/11] git commit: 6503 followup, make sure we send CompleteMessage when streaming is done.

6503 followup, make sure we send CompleteMessage when streaming is done.

Patch by yukim, reviewed by marcuse for CASSANDRA-6503


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1141cdb0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1141cdb0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1141cdb0

Branch: refs/heads/trunk
Commit: 1141cdb0c8122d3e990f29dc82576bededcbfd40
Parents: a8a12d9
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Jan 31 11:55:02 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Jan 31 11:55:02 2014 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/streaming/StreamSession.java  | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1141cdb0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 4777995..7972183 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -144,6 +144,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     }
 
     private volatile State state = State.INITIALIZED;
+    private volatile boolean completeSent = false;
 
     /**
      * Create new streaming session with the peer.
@@ -505,11 +506,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     {
         if (state == State.WAIT_COMPLETE)
         {
+            if (!completeSent)
+            {
+                handler.sendMessage(new CompleteMessage());
+                completeSent = true;
+            }
             closeSession(State.COMPLETE);
         }
         else
         {
-            handler.sendMessage(new CompleteMessage());
             state(State.WAIT_COMPLETE);
         }
     }
@@ -594,12 +599,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         {
             if (state == State.WAIT_COMPLETE)
             {
+                if (!completeSent)
+                {
+                    handler.sendMessage(new CompleteMessage());
+                    completeSent = true;
+                }
                 closeSession(State.COMPLETE);
             }
             else
             {
                 // notify peer that this session is completed
                 handler.sendMessage(new CompleteMessage());
+                completeSent = true;
                 state(State.WAIT_COMPLETE);
             }
         }