You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/11 22:08:39 UTC
[5/5] git commit: Add failure callbacks for outgoing streams. Patch
by Yuki Morishita, reviewed by brandonwilliams for CASSANDRA-4051
Add failure callbacks for outgoing streams.
Patch by Yuki Morishita, reviewed by brandonwilliams for CASSANDRA-4051
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/34c1fc0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/34c1fc0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/34c1fc0b
Branch: refs/heads/cassandra-1.1
Commit: 34c1fc0b7cbdf568ec7869a564fe614f96dcbe9b
Parents: 97aa922
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 11 15:06:45 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 11 15:06:45 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/dht/RangeStreamer.java | 6 ++++-
.../apache/cassandra/io/sstable/SSTableLoader.java | 6 ++++-
.../apache/cassandra/service/StorageService.java | 19 ++++++++++++--
.../apache/cassandra/streaming/FileStreamTask.java | 7 +++++
.../cassandra/streaming/StreamInSession.java | 12 ++++++++-
5 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index dac05cf..6f7beb0 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -230,7 +230,11 @@ public class RangeStreamer
source, table, opType, latch.getCount()));
}
- public void onFailure() {}
+ public void onFailure()
+ {
+ logger.warn("Streaming from " + source + " failed");
+ onSuccess(); // calling onSuccess for latch countdown
+ }
};
if (logger.isDebugEnabled())
logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 85b5146..79259ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -227,7 +227,11 @@ public class SSTableLoader
client.stop();
}
- public void onFailure() {}
+ public void onFailure()
+ {
+ outputHandler.output(String.format("Streaming session to %s failed", endpoint));
+ onSuccess(); // call onSuccess for latch countdown
+ }
}
public interface OutputHandler
http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 84c0096..88b9c19 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1502,7 +1502,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- public void onFailure() {}
+ public void onFailure()
+ {
+ logger_.warn("Streaming from " + source + " failed");
+ onSuccess(); // calling onSuccess to send notification
+ }
};
if (logger_.isDebugEnabled())
logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
@@ -2813,7 +2817,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
latch.countDown();
}
}
- public void onFailure() {}
+
+ public void onFailure()
+ {
+ logger_.warn("Streaming to " + endPointEntry + " failed");
+ onSuccess(); // calling onSuccess for latch countdown
+ }
};
StageManager.getStage(Stage.STREAM).execute(new Runnable()
@@ -2865,7 +2874,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
latch.countDown();
}
- public void onFailure() {}
+ public void onFailure()
+ {
+ logger_.warn("Streaming from " + source + " failed");
+ onSuccess(); // calling onSuccess for latch countdown
+ }
};
if (logger_.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index babdc4e..8ff2b83 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -106,6 +106,13 @@ public class FileStreamTask extends WrappedRunnable
logger.info("Finished streaming session to {}", to);
}
}
+ catch (IOException e)
+ {
+ StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+ if (session != null)
+ session.close(false);
+ throw e;
+ }
finally
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e662a49..a5e08f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -127,12 +127,20 @@ public class StreamInSession extends AbstractStreamSession
{
logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current),
new IllegalStateException("Too many retries for " + remoteFile));
- closeInternal(false);
+ close(false);
return;
}
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
- sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+ try
+ {
+ sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
+ }
+ catch (IOException e)
+ {
+ logger.error("Sending retry message failed, closing session.", e);
+ close(false);
+ }
}
public void sendMessage(Message message) throws IOException