You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/05/22 19:51:43 UTC

[2/3] cassandra git commit: More fixes to connection error handling in CqlRecordWriter

More fixes to connection error handling in CqlRecordWriter

Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9442


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d24b1a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d24b1a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d24b1a8

Branch: refs/heads/trunk
Commit: 0d24b1a802f67641b534cf2a407342f6129862ef
Parents: 5ab1496
Author: Philip Thompson <pt...@gmail.com>
Authored: Wed May 20 13:56:31 2015 -0400
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri May 22 18:39:59 2015 +0100

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 57 +++++++++++---------
 1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d24b1a8/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index c507197..91753a2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -26,30 +26,18 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import com.datastax.driver.core.exceptions.AuthenticationException;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TokenRange;
-import org.apache.cassandra.db.marshal.AbstractType;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
 import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -214,7 +202,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
-	final InetAddress address = ringCache.getEndpoints(range).get(0);
+        final InetAddress address = ringCache.getEndpoints(range).get(0);
         RangeClient client = clients.get(address);
         if (client == null)
         {
@@ -325,8 +313,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                         //There are too many ways for the Thread.interrupted() state to be cleared, so
                         //we can't rely on that here. Until the java driver gives us a better way of knowing
                         //that this exception came from an InterruptedException, this is the best solution.
-                        if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted")) {
-                            lastException = new IOException(e);
+                        if (canRetryDriverConnection(e))
+                        {
                             iter.previous();
                         }
                         closeInternal();
@@ -417,7 +405,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                 throw lastException;
         }
 
-
         protected void closeInternal()
         {
             if (client != null)
@@ -425,6 +412,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                 client.close();;
             }
         }
+
+        private boolean canRetryDriverConnection(Exception e)
+        {
+            if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted"))
+                return true;
+            if (e instanceof NoHostAvailableException)
+            {
+                if (((NoHostAvailableException) e).getErrors().values().size() == 1)
+                {
+                    Throwable cause = ((NoHostAvailableException) e).getErrors().values().iterator().next();
+                    if (cause != null && cause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
+                    {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
     }
 
     private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)