You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/16 18:52:15 UTC
[2/3] git commit: sstableloader detects and reports failures. Patch
by brandonwilliams reviewed by Yuki Morishita for CASSANDRA-4146
sstableloader detects and reports failures.
Patch by brandonwilliams reviewed by Yuki Morishita for CASSANDRA-4146
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67b340be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67b340be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67b340be
Branch: refs/heads/trunk
Commit: 67b340becd1aa369c71af10e5b76570e73809e2e
Parents: f57f1c0
Author: Brandon Williams <br...@apache.org>
Authored: Mon Apr 16 11:50:33 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Apr 16 11:50:33 2012 -0500
----------------------------------------------------------------------
.../apache/cassandra/io/sstable/SSTableLoader.java | 34 +++++++++++---
.../apache/cassandra/streaming/FileStreamTask.java | 6 +-
.../org/apache/cassandra/tools/BulkLoader.java | 6 +++
3 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 79259ec..c76d1e3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -137,7 +137,7 @@ public class SSTableLoader
continue;
}
Collection<Range<Token>> ranges = entry.getValue();
- StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future.latch, remote));
+ StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future, remote));
// transferSSTables assumes references have been acquired
SSTableReader.acquireReferences(sstables);
StreamOut.transferSSTables(session, sstables, ranges, OperationType.BULK_LOAD);
@@ -150,6 +150,7 @@ public class SSTableLoader
{
final CountDownLatch latch;
final Map<InetAddress, Collection<PendingFile>> pendingFiles;
+ private List<InetAddress> failedHosts = new ArrayList<InetAddress>();
private LoaderFuture(int request)
{
@@ -162,6 +163,16 @@ public class SSTableLoader
pendingFiles.put(remote, new ArrayList(files));
}
+ private void setFailed(InetAddress addr)
+ {
+ failedHosts.add(addr);
+ }
+
+ public List<InetAddress> getFailedHosts()
+ {
+ return failedHosts;
+ }
+
public boolean cancel(boolean mayInterruptIfRunning)
{
throw new UnsupportedOperationException("Cancellation is not yet supported");
@@ -192,6 +203,11 @@ public class SSTableLoader
return latch.getCount() == 0;
}
+ public boolean hadFailures()
+ {
+ return failedHosts.size() > 0;
+ }
+
public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
{
return pendingFiles;
@@ -209,28 +225,30 @@ public class SSTableLoader
private class CountDownCallback implements IStreamCallback
{
private final InetAddress endpoint;
- private final CountDownLatch latch;
+ private final LoaderFuture future;
- CountDownCallback(CountDownLatch latch, InetAddress endpoint)
+ CountDownCallback(LoaderFuture future, InetAddress endpoint)
{
- this.latch = latch;
+ this.future = future;
this.endpoint = endpoint;
}
public void onSuccess()
{
- latch.countDown();
- outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
+ future.latch.countDown();
+ outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, future.latch.getCount()));
// There could be race with stop being called twice but it should be ok
- if (latch.getCount() == 0)
+ if (future.latch.getCount() == 0)
client.stop();
}
public void onFailure()
{
outputHandler.output(String.format("Streaming session to %s failed", endpoint));
- onSuccess(); // call onSuccess for latch countdown
+ future.setFailed(endpoint);
+ future.latch.countDown();
+ client.stop();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 8ff2b83..5b62af6 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -47,8 +47,7 @@ public class FileStreamTask extends WrappedRunnable
private static Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
public static final int CHUNK_SIZE = 64 * 1024;
- // around 10 minutes at the default rpctimeout
- public static final int MAX_CONNECT_ATTEMPTS = 8;
+ public static final int MAX_CONNECT_ATTEMPTS = 4;
protected final StreamHeader header;
protected final InetAddress to;
@@ -270,7 +269,8 @@ public class FileStreamTask extends WrappedRunnable
protected void close() throws IOException
{
- output.close();
+ if (output != null)
+ output.close();
}
public String toString()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 47936e9..4520188 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -86,6 +86,12 @@ public class BulkLoader
}
if (!printEnd)
indicator.printProgress();
+ if (future.hadFailures())
+ {
+ System.err.println("Streaming to the following hosts failed:");
+ System.err.println(future.getFailedHosts());
+ System.exit(1);
+ }
}
System.exit(0); // We need that to stop non daemonized threads