You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/19 17:35:36 UTC

svn commit: r1172667 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/streaming/

Author: slebresne
Date: Mon Sep 19 15:35:35 2011
New Revision: 1172667

URL: http://svn.apache.org/viewvc?rev=1172667&view=rev
Log:
merge from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1172667&r1=1172666&r2=1172667&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 19 15:35:35 2011
@@ -5,6 +5,9 @@
 1.0.0-rc1
  * Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
  * Log message when a full repair operation completes (CASSANDRA-3207)
+ * Fix streamOutSession keeping sstables references forever if the remote end
+   dies (CASSANDRA-3216)
+
 
 1.0.0-beta1
  * removed binarymemtable (CASSANDRA-2692)

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/contrib:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:35:35 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1171096,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1171099,1172598
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1171098,1172027,1172597
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1171099,1172598,1172665
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1171098,1172027,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1172667&r1=1172666&r2=1172667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutSession.java Mon Sep 19 15:35:35 2011
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -34,7 +37,7 @@ import org.cliffc.high_scale_lib.NonBloc
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamOutSession
+public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class );
 
@@ -70,12 +73,15 @@ public class StreamOutSession
     private final Pair<InetAddress, Long> context;
     private final Runnable callback;
     private volatile String currentFile;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback)
     {
         this.table = table;
         this.context = context;
         this.callback = callback;
+        Gossiper.instance.register(this);
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public InetAddress getHost()
@@ -123,11 +129,30 @@ public class StreamOutSession
 
     public void close()
     {
-        // Release reference on last file
+        close(true);
+    }
+
+    private void close(boolean success)
+    {
+        // Though unlikely, it is possible for close to be called multiple
+        // time, if the endpoint die at the exact wrong time for instance.
+        if (!isClosed.compareAndSet(false, true))
+        {
+            logger.debug("StreamOutSession {} already closed", getSessionId());
+            return;
+        }
+
+        Gossiper.instance.unregister(this);
+        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+        // Release reference on last file (or any uncompleted ones)
         for (PendingFile file : files.values())
             file.sstable.releaseReference();
         streams.remove(context);
-        if (callback != null)
+        // Instead of just not calling the callback on failure, we could have
+        // allow to register a specific callback for failures, but we leave
+        // that to a future ticket (likely CASSANDRA-3112)
+        if (callback != null && success)
             callback.run();
     }
 
@@ -179,4 +204,32 @@ public class StreamOutSession
         logger.debug("Files are {}", StringUtils.join(files.values(), ","));
         MessagingService.instance().stream(header, getHost());
     }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!endpoint.equals(getHost()))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        logger.error("StreamOutSession {} failed because {} died or was restarted/removed", endpoint);
+        close(false);
+    }
 }