You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/27 22:04:22 UTC

[5/7] git commit: Add max_streaming_retries instead of relying solely on the FD. Patch by brandonwilliams and yukim, reviewed by brandonwilliams for CASSANDRA-4051

Add max_streaming_retries instead of relying solely on the FD.
Patch by brandonwilliams and yukim, reviewed by brandonwilliams for
CASSANDRA-4051


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ee8682e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ee8682e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ee8682e

Branch: refs/heads/trunk
Commit: 3ee8682eb8e32f04979f44984911cf3547ead206
Parents: e3c4c09
Author: Brandon Williams <br...@apache.org>
Authored: Tue Mar 27 15:00:41 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Mar 27 15:00:41 2012 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/config/Config.java   |    2 +
 .../cassandra/config/DatabaseDescriptor.java       |    7 +-
 .../org/apache/cassandra/dht/RangeStreamer.java    |    7 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    6 +-
 .../apache/cassandra/service/StorageService.java   |   17 ++-
 .../cassandra/streaming/AbstractStreamSession.java |  112 +++++++++++++++
 .../cassandra/streaming/IStreamCallback.java       |   36 +++++
 .../org/apache/cassandra/streaming/StreamIn.java   |    4 +-
 .../cassandra/streaming/StreamInSession.java       |   68 +++++----
 .../org/apache/cassandra/streaming/StreamOut.java  |    2 +-
 .../cassandra/streaming/StreamOutSession.java      |   88 ++----------
 .../apache/cassandra/streaming/StreamReply.java    |    1 +
 .../streaming/StreamReplyVerbHandler.java          |    5 +-
 .../cassandra/streaming/StreamingRepairTask.java   |   22 ++--
 14 files changed, 244 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index d875584..1131721 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -88,6 +88,8 @@ public class Config
     public Integer compaction_throughput_mb_per_sec = 16;
     public Boolean multithreaded_compaction = false;
 
+    public Integer max_streaming_retries = 3;
+
     public Integer stream_throughput_outbound_megabits_per_sec;
 
     public String[] data_file_directories;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 439c958..60b4724 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -648,11 +648,16 @@ public class DatabaseDescriptor
         return System.getProperty("cassandra.replace_token", null);
     }
 
-   public static String getClusterName()
+    public static String getClusterName()
     {
         return conf.cluster_name;
     }
 
+    public static int getMaxStreamingRetries()
+    {
+        return conf.max_streaming_retries;
+    }
+
     public static String getJobJarLocation()
     {
         return conf.job_jar_file_location;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 47931f8..dac05cf 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.streaming.IStreamCallback;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,15 +220,17 @@ public class RangeStreamer
             final InetAddress source = entry.getValue().getKey();
             Collection<Range<Token>> ranges = entry.getValue().getValue();
             /* Send messages to respective folks to stream data over to me */
-            Runnable callback = new Runnable()
+            IStreamCallback callback = new IStreamCallback()
             {
-                public void run()
+                public void onSuccess()
                 {
                     latch.countDown();
                     if (logger.isDebugEnabled())
                         logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
                                      source, table, opType, latch.getCount()));
                 }
+
+                public void onFailure() {}
             };
             if (logger.isDebugEnabled())
                 logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ee7a2f..85b5146 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -206,7 +206,7 @@ public class SSTableLoader
         return builder.toString();
     }
 
-    private class CountDownCallback implements Runnable
+    private class CountDownCallback implements IStreamCallback
     {
         private final InetAddress endpoint;
         private final CountDownLatch latch;
@@ -217,7 +217,7 @@ public class SSTableLoader
             this.endpoint = endpoint;
         }
 
-        public void run()
+        public void onSuccess()
         {
             latch.countDown();
             outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
@@ -226,6 +226,8 @@ public class SSTableLoader
             if (latch.getCount() == 0)
                 client.stop();
         }
+
+        public void onFailure() {}
     }
 
     public interface OutputHandler

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 986fd81..84c0096 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1490,9 +1490,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             {
                 final InetAddress source = entry.getKey();
                 Collection<Range<Token>> ranges = entry.getValue();
-                final Runnable callback = new Runnable()
+                final IStreamCallback callback = new IStreamCallback()
                 {
-                    public void run()
+                    public void onSuccess()
                     {
                         synchronized (fetchSources)
                         {
@@ -1501,6 +1501,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                                 sendReplicationNotification(myAddress, notifyEndpoint);
                         }
                     }
+
+                    public void onFailure() {}
                 };
                 if (logger_.isDebugEnabled())
                     logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
@@ -2799,9 +2801,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                 final Range<Token> range = endPointEntry.getKey();
                 final InetAddress newEndpoint = endPointEntry.getValue();
 
-                final Runnable callback = new Runnable()
+                final IStreamCallback callback = new IStreamCallback()
                 {
-                    public void run()
+                    public void onSuccess()
                     {
                         synchronized (pending)
                         {
@@ -2811,6 +2813,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                                 latch.countDown();
                         }
                     }
+                    public void onFailure() {}
                 };
 
                 StageManager.getStage(Stage.STREAM).execute(new Runnable()
@@ -2852,15 +2855,17 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             {
                 Collection<Range<Token>> toFetch = endpointWithRanges.get(source);
 
-                final Runnable callback = new Runnable()
+                final IStreamCallback callback = new IStreamCallback()
                 {
-                    public void run()
+                    public void onSuccess()
                     {
                         pending.remove(source);
 
                         if (pending.isEmpty())
                             latch.countDown();
                     }
+
+                    public void onFailure() {}
                 };
 
                 if (logger_.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
new file mode 100644
index 0000000..1938e3d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.utils.Pair;
+
+public abstract class AbstractStreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
+
+    protected String table;
+    protected Pair<InetAddress, Long> context;
+    protected final IStreamCallback callback;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    protected AbstractStreamSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+    {
+        this.table = table;
+        this.context = context;
+        this.callback = callback;
+        Gossiper.instance.register(this);
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+    }
+
+    public long getSessionId()
+    {
+        return context.right;
+    }
+
+    public InetAddress getHost()
+    {
+        return context.left;
+    }
+
+    public void close(boolean success)
+    {
+        if (!isClosed.compareAndSet(false, true))
+        {
+            logger.debug("Stream session {} already closed", getSessionId());
+            return;
+        }
+
+        closeInternal(success);
+
+        Gossiper.instance.unregister(this);
+        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+        logger.debug("closing with status " + success);
+        if (callback != null)
+        {
+            if (success)
+                callback.onSuccess();
+            else
+                callback.onFailure();
+        }
+    }
+
+    protected abstract void closeInternal(boolean success);
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!endpoint.equals(getHost()))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        logger.error("Stream failed because {} died or was restarted/removed (streams may still be active "
+                      + "in background, but further streams won't be started)", endpoint);
+        close(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/IStreamCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IStreamCallback.java b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
new file mode 100644
index 0000000..f0d7754
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+/**
+ * Callback interface for streaming session success/failure.
+ */
+public interface IStreamCallback
+{
+    /**
+     * called when stream session is finished successfully.
+     */
+    public void onSuccess();
+
+    /**
+     * called when streaming somehow got in trouble.
+     */
+    public void onFailure();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
index ce6f7ef..0621086 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -48,7 +48,7 @@ public class StreamIn
     private static Logger logger = LoggerFactory.getLogger(StreamIn.class);
 
     /** Request ranges for all column families in the given keyspace. */
-    public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
     {
         requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
     }
@@ -56,7 +56,7 @@ public class StreamIn
     /**
      * Request ranges to be transferred from specific CFs
      */
-    public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+    public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
     {
         assert ranges.size() > 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 2e25436..e662a49 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -25,42 +25,40 @@ import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.utils.Pair;
 
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /** each context gets its own StreamInSession. So there may be >1 Session per host */
-public class StreamInSession
+public class StreamInSession extends AbstractStreamSession
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
 
     private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
 
     private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
-    private final Pair<InetAddress, Long> context;
-    private final Runnable callback;
-    private String table;
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
     private Socket socket;
+    private volatile int retries;
 
-    private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
+    private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
     {
-        this.context = context;
-        this.callback = callback;
+        super(null, context, callback);
     }
 
-    public static StreamInSession create(InetAddress host, Runnable callback)
+    public static StreamInSession create(InetAddress host, IStreamCallback callback)
     {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
         StreamInSession session = new StreamInSession(context, callback);
@@ -76,9 +74,7 @@ public class StreamInSession
         {
             StreamInSession possibleNew = new StreamInSession(context, null);
             if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
-            {
                 session = possibleNew;
-            }
         }
         return session;
     }
@@ -126,8 +122,16 @@ public class StreamInSession
 
     public void retry(PendingFile remoteFile) throws IOException
     {
+        retries++;
+        if (retries > DatabaseDescriptor.getMaxStreamingRetries())
+        {
+            logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current),
+                         new IllegalStateException("Too many retries for " + remoteFile));
+            closeInternal(false);
+            return;
+        }
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
-        logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
+        logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
         sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
     }
 
@@ -136,7 +140,6 @@ public class StreamInSession
         OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream()));
     }
 
-
     public void closeIfFinished() throws IOException
     {
         if (files.isEmpty())
@@ -151,7 +154,7 @@ public class StreamInSession
                     // Acquire the reference (for secondary index building) before submitting the index build,
                     // so it can't get compacted out of existence in between
                     if (!sstable.acquireReference())
-                        throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transfered");
+                        throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
 
                     ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
                     cfs.addSSTable(sstable);
@@ -189,20 +192,25 @@ public class StreamInSession
                     socket.close();
             }
 
-            if (callback != null)
-                callback.run();
-            sessions.remove(context);
+            close(true);
         }
     }
 
-    public long getSessionId()
+    protected void closeInternal(boolean success)
     {
-        return context.right;
-    }
-
-    public InetAddress getHost()
-    {
-        return context.left;
+        sessions.remove(context);
+        if (!success && FailureDetector.instance.isAlive(getHost()))
+        {
+            try
+            {
+                StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
+                MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+            }
+            catch (IOException ex)
+            {
+                logger.error("Error sending streaming session failure notification to " + getHost(), ex);
+            }
+        }
     }
 
     /** query method to determine which hosts are streaming to this node. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 710c9ed..d3f37b5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -81,7 +81,7 @@ public class StreamOut
     /**
      * Stream the given ranges to the target endpoint from each CF in the given keyspace.
     */
-    public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+    public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
     {
         StreamOutSession session = StreamOutSession.create(table.name, target, callback);
         transferRanges(session, table.getColumnFamilyStores(), ranges, type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index 80629b8..7a53c7c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -22,29 +22,27 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * This class manages the streaming of multiple files one after the other.
-*/
-public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+ */
+public class StreamOutSession extends AbstractStreamSession
 {
-    private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class );
+    private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
 
     // one host may have multiple stream sessions.
     private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
 
-    public static StreamOutSession create(String table, InetAddress host, Runnable callback)
+    public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
     {
         return create(table, host, System.nanoTime(), callback);
     }
@@ -54,7 +52,7 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
         return create(table, host, sessionId, null);
     }
 
-    public static StreamOutSession create(String table, InetAddress host, long sessionId, Runnable callback)
+    public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback)
     {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
         StreamOutSession session = new StreamOutSession(table, context, callback);
@@ -69,29 +67,11 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
 
     private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
 
-    public final String table;
-    private final Pair<InetAddress, Long> context;
-    private final Runnable callback;
     private volatile String currentFile;
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-    private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback)
-    {
-        this.table = table;
-        this.context = context;
-        this.callback = callback;
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
-    }
-
-    public InetAddress getHost()
-    {
-        return context.left;
-    }
 
-    public long getSessionId()
+    private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
     {
-        return context.right;
+        super(table, context, callback);
     }
 
     public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -127,33 +107,12 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
             streamFile(iter.next());
     }
 
-    public void close()
+    protected void closeInternal(boolean success)
     {
-        close(true);
-    }
-
-    private void close(boolean success)
-    {
-        // Though unlikely, it is possible for close to be called multiple
-        // time, if the endpoint die at the exact wrong time for instance.
-        if (!isClosed.compareAndSet(false, true))
-        {
-            logger.debug("StreamOutSession {} already closed", getSessionId());
-            return;
-        }
-
-        Gossiper.instance.unregister(this);
-        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-
         // Release reference on last file (or any uncompleted ones)
         for (PendingFile file : files.values())
             file.sstable.releaseReference();
         streams.remove(context);
-        // Instead of just not calling the callback on failure, we could have
-        // allow to register a specific callback for failures, but we leave
-        // that to a future ticket (likely CASSANDRA-3112)
-        if (callback != null && success)
-            callback.run();
     }
 
     /** convenience method for use when testing */
@@ -204,33 +163,4 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
         logger.debug("Files are {}", StringUtils.join(files.values(), ","));
         MessagingService.instance().stream(header, getHost());
     }
-
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
-
-    public void onRemove(InetAddress endpoint)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void onRestart(InetAddress endpoint, EndpointState epState)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!endpoint.equals(getHost()))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
-        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
-        logger.error("StreamOutSession {} failed because {} died or was restarted/removed (streams may still be active "
-                + "in background, but further streams won't be started)", endpoint);
-        close(false);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index f97cfee..c9b82a7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -37,6 +37,7 @@ class StreamReply implements MessageProducer
         FILE_FINISHED,
         FILE_RETRY,
         SESSION_FINISHED,
+        SESSION_FAILURE,
     }
 
     public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index ddc0690..e839fd5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -65,7 +65,10 @@ public class StreamReplyVerbHandler implements IVerbHandler
                     session.retry();
                     break;
                 case SESSION_FINISHED:
-                    session.close();
+                    session.close(true);
+                    break;
+                case SESSION_FAILURE:
+                    session.close(false);
                     break;
                 default:
                     throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index ab341ed..af9a059 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -63,9 +63,9 @@ public class StreamingRepairTask implements Runnable
     private final String tableName;
     private final String cfName;
     private final Collection<Range<Token>> ranges;
-    private final Runnable callback;
+    private final IStreamCallback callback;
 
-    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback)
+    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback)
     {
         this.id = id;
         this.owner = owner;
@@ -143,14 +143,14 @@ public class StreamingRepairTask implements Runnable
         }
     }
 
-    private static Runnable makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
+    private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
     {
-        return new Runnable()
+        return new IStreamCallback()
         {
             // we expect one callback for the receive, and one for the send
             private final AtomicInteger outstanding = new AtomicInteger(2);
 
-            public void run()
+            public void onSuccess()
             {
                 if (outstanding.decrementAndGet() > 0)
                     // waiting on more calls
@@ -165,18 +165,20 @@ public class StreamingRepairTask implements Runnable
                     throw new IOError(e);
                 }
             }
+
+            public void onFailure() {}
         };
     }
 
     // wrap a given callback so as to unregister the streaming repair task on completion
-    private static Runnable wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
+    private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
     {
-        return new Runnable()
+        return new IStreamCallback()
         {
             // we expect one callback for the receive, and one for the send
             private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
 
-            public void run()
+            public void onSuccess()
             {
                 if (outstanding.decrementAndGet() > 0)
                     // waiting on more calls
@@ -186,6 +188,8 @@ public class StreamingRepairTask implements Runnable
                 if (callback != null)
                     callback.run();
             }
+
+            public void onFailure() {}
         };
     }
 
@@ -253,7 +257,7 @@ public class StreamingRepairTask implements Runnable
 
             logger.info(String.format("[streaming task #%s] task succeeded", task.id));
             if (task.callback != null)
-                task.callback.run();
+                task.callback.onSuccess();
         }
 
         private static void reply(InetAddress remote, UUID taskid) throws IOException