You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/11 22:08:39 UTC

[5/5] git commit: Add failure callbacks for outgoing streams. Patch by Yuki Morishita, reviewed by brandonwilliams for CASSANDRA-4051

Add failure callbacks for outgoing streams.
Patch by Yuki Morishita, reviewed by brandonwilliams for CASSANDRA-4051


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/34c1fc0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/34c1fc0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/34c1fc0b

Branch: refs/heads/cassandra-1.1
Commit: 34c1fc0b7cbdf568ec7869a564fe614f96dcbe9b
Parents: 97aa922
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 11 15:06:45 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 11 15:06:45 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/RangeStreamer.java    |    6 ++++-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    6 ++++-
 .../apache/cassandra/service/StorageService.java   |   19 ++++++++++++--
 .../apache/cassandra/streaming/FileStreamTask.java |    7 +++++
 .../cassandra/streaming/StreamInSession.java       |   12 ++++++++-
 5 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index dac05cf..6f7beb0 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -230,7 +230,11 @@ public class RangeStreamer
                                      source, table, opType, latch.getCount()));
                 }
 
-                public void onFailure() {}
+                public void onFailure()
+                {
+                    logger.warn("Streaming from " + source + " failed");
+                    onSuccess(); // calling onSuccess for latch countdown
+                }
             };
             if (logger.isDebugEnabled())
                 logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 85b5146..79259ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -227,7 +227,11 @@ public class SSTableLoader
                 client.stop();
         }
 
-        public void onFailure() {}
+        public void onFailure()
+        {
+            outputHandler.output(String.format("Streaming session to %s failed", endpoint));
+            onSuccess(); // call onSuccess for latch countdown
+        }
     }
 
     public interface OutputHandler

http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 84c0096..88b9c19 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1502,7 +1502,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                         }
                     }
 
-                    public void onFailure() {}
+                    public void onFailure()
+                    {
+                        logger_.warn("Streaming from " + source + " failed");
+                        onSuccess(); // calling onSuccess to send notification
+                    }
                 };
                 if (logger_.isDebugEnabled())
                     logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
@@ -2813,7 +2817,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                                 latch.countDown();
                         }
                     }
-                    public void onFailure() {}
+
+                    public void onFailure()
+                    {
+                        logger_.warn("Streaming to " + endPointEntry + " failed");
+                        onSuccess(); // calling onSuccess for latch countdown
+                    }
                 };
 
                 StageManager.getStage(Stage.STREAM).execute(new Runnable()
@@ -2865,7 +2874,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                             latch.countDown();
                     }
 
-                    public void onFailure() {}
+                    public void onFailure()
+                    {
+                        logger_.warn("Streaming from " + source + " failed");
+                        onSuccess(); // calling onSuccess for latch countdown
+                    }
                 };
 
                 if (logger_.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index babdc4e..8ff2b83 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -106,6 +106,13 @@ public class FileStreamTask extends WrappedRunnable
                 logger.info("Finished streaming session to {}", to);
             }
         }
+        catch (IOException e)
+        {
+            StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+            if (session != null)
+                session.close(false);
+            throw e;
+        }
         finally
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e662a49..a5e08f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -127,12 +127,20 @@ public class StreamInSession extends AbstractStreamSession
         {
             logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current),
                          new IllegalStateException("Too many retries for " + remoteFile));
-            closeInternal(false);
+            close(false);
             return;
         }
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
         logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
-        sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+        try
+        {
+            sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+        }
+        catch (IOException e)
+        {
+            logger.error("Sending retry message failed, closing session.", e);
+            close(false);
+        }
     }
 
     public void sendMessage(Message message) throws IOException