You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/04/16 18:52:15 UTC

[2/3] git commit: sstableloader detects and reports failures. Patch by brandonwilliams reviewed by Yuki Morishita for CASSANDRA-4146

sstableloader detects and reports failures.
Patch by brandonwilliams reviewed by Yuki Morishita for CASSANDRA-4146


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67b340be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67b340be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67b340be

Branch: refs/heads/trunk
Commit: 67b340becd1aa369c71af10e5b76570e73809e2e
Parents: f57f1c0
Author: Brandon Williams <br...@apache.org>
Authored: Mon Apr 16 11:50:33 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Apr 16 11:50:33 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/io/sstable/SSTableLoader.java |   34 +++++++++++---
 .../apache/cassandra/streaming/FileStreamTask.java |    6 +-
 .../org/apache/cassandra/tools/BulkLoader.java     |    6 +++
 3 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 79259ec..c76d1e3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -137,7 +137,7 @@ public class SSTableLoader
                 continue;
             }
             Collection<Range<Token>> ranges = entry.getValue();
-            StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future.latch, remote));
+            StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future, remote));
             // transferSSTables assumes references have been acquired
             SSTableReader.acquireReferences(sstables);
             StreamOut.transferSSTables(session, sstables, ranges, OperationType.BULK_LOAD);
@@ -150,6 +150,7 @@ public class SSTableLoader
     {
         final CountDownLatch latch;
         final Map<InetAddress, Collection<PendingFile>> pendingFiles;
+        private List<InetAddress> failedHosts = new ArrayList<InetAddress>();
 
         private LoaderFuture(int request)
         {
@@ -162,6 +163,16 @@ public class SSTableLoader
             pendingFiles.put(remote, new ArrayList(files));
         }
 
+        private void setFailed(InetAddress addr)
+        {
+            failedHosts.add(addr);
+        }
+
+        public List<InetAddress> getFailedHosts()
+        {
+            return failedHosts;
+        }
+
         public boolean cancel(boolean mayInterruptIfRunning)
         {
             throw new UnsupportedOperationException("Cancellation is not yet supported");
@@ -192,6 +203,11 @@ public class SSTableLoader
             return latch.getCount() == 0;
         }
 
+        public boolean hadFailures()
+        {
+            return failedHosts.size() > 0;
+        }
+
         public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
         {
             return pendingFiles;
@@ -209,28 +225,30 @@ public class SSTableLoader
     private class CountDownCallback implements IStreamCallback
     {
         private final InetAddress endpoint;
-        private final CountDownLatch latch;
+        private final LoaderFuture future;
 
-        CountDownCallback(CountDownLatch latch, InetAddress endpoint)
+        CountDownCallback(LoaderFuture future, InetAddress endpoint)
         {
-            this.latch = latch;
+            this.future = future;
             this.endpoint = endpoint;
         }
 
         public void onSuccess()
         {
-            latch.countDown();
-            outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
+            future.latch.countDown();
+            outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, future.latch.getCount()));
 
             // There could be race with stop being called twice but it should be ok
-            if (latch.getCount() == 0)
+            if (future.latch.getCount() == 0)
                 client.stop();
         }
 
         public void onFailure()
         {
             outputHandler.output(String.format("Streaming session to %s failed", endpoint));
-            onSuccess(); // call onSuccess for latch countdown
+            future.setFailed(endpoint);
+            future.latch.countDown();
+            client.stop();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 8ff2b83..5b62af6 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -47,8 +47,7 @@ public class FileStreamTask extends WrappedRunnable
     private static Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
 
     public static final int CHUNK_SIZE = 64 * 1024;
-    // around 10 minutes at the default rpctimeout
-    public static final int MAX_CONNECT_ATTEMPTS = 8;
+    public static final int MAX_CONNECT_ATTEMPTS = 4;
 
     protected final StreamHeader header;
     protected final InetAddress to;
@@ -270,7 +269,8 @@ public class FileStreamTask extends WrappedRunnable
 
     protected void close() throws IOException
     {
-        output.close();
+        if (output != null)
+            output.close();
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 47936e9..4520188 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -86,6 +86,12 @@ public class BulkLoader
                 }
                 if (!printEnd)
                     indicator.printProgress();
+                if (future.hadFailures())
+                {
+                    System.err.println("Streaming to the following hosts failed:");
+                    System.err.println(future.getFailedHosts());
+                    System.exit(1);
+                }
             }
 
             System.exit(0); // We need that to stop non daemonized threads