You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/05/22 19:51:42 UTC
[1/3] cassandra git commit: More fixes to connection error handling
in CqlRecordWriter
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 5ab14968e -> 0d24b1a80
refs/heads/trunk 491f7dc27 -> 8d1a50e09
More fixes to connection error handling in CqlRecordWriter
Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9442
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d24b1a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d24b1a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d24b1a8
Branch: refs/heads/cassandra-2.2
Commit: 0d24b1a802f67641b534cf2a407342f6129862ef
Parents: 5ab1496
Author: Philip Thompson <pt...@gmail.com>
Authored: Wed May 20 13:56:31 2015 -0400
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri May 22 18:39:59 2015 +0100
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d24b1a8/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index c507197..91753a2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -26,30 +26,18 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.exceptions.AuthenticationException;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TokenRange;
-import org.apache.cassandra.db.marshal.AbstractType;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
/**
@@ -214,7 +202,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
- final InetAddress address = ringCache.getEndpoints(range).get(0);
+ final InetAddress address = ringCache.getEndpoints(range).get(0);
RangeClient client = clients.get(address);
if (client == null)
{
@@ -325,8 +313,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
//There are too many ways for the Thread.interrupted() state to be cleared, so
//we can't rely on that here. Until the java driver gives us a better way of knowing
//that this exception came from an InterruptedException, this is the best solution.
- if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted")) {
- lastException = new IOException(e);
+ if (canRetryDriverConnection(e))
+ {
iter.previous();
}
closeInternal();
@@ -417,7 +405,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
throw lastException;
}
-
protected void closeInternal()
{
if (client != null)
@@ -425,6 +412,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
client.close();;
}
}
+
+ private boolean canRetryDriverConnection(Exception e)
+ {
+ if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted"))
+ return true;
+ if (e instanceof NoHostAvailableException)
+ {
+ if (((NoHostAvailableException) e).getErrors().values().size() == 1)
+ {
+ Throwable cause = ((NoHostAvailableException) e).getErrors().values().iterator().next();
+ if (cause != null && cause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by sa...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8d1a50e0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8d1a50e0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8d1a50e0
Branch: refs/heads/trunk
Commit: 8d1a50e0978ab47d40c115995ccf9bafc9a17450
Parents: 491f7dc 0d24b1a
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri May 22 18:41:42 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri May 22 18:41:42 2015 +0100
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
[2/3] cassandra git commit: More fixes to connection error handling
in CqlRecordWriter
Posted by sa...@apache.org.
More fixes to connection error handling in CqlRecordWriter
Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9442
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d24b1a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d24b1a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d24b1a8
Branch: refs/heads/trunk
Commit: 0d24b1a802f67641b534cf2a407342f6129862ef
Parents: 5ab1496
Author: Philip Thompson <pt...@gmail.com>
Authored: Wed May 20 13:56:31 2015 -0400
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri May 22 18:39:59 2015 +0100
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 57 +++++++++++---------
1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d24b1a8/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index c507197..91753a2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -26,30 +26,18 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.exceptions.AuthenticationException;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.ColumnMetadata;
-import com.datastax.driver.core.Host;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TokenRange;
-import org.apache.cassandra.db.marshal.AbstractType;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.*;
import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
/**
@@ -214,7 +202,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
- final InetAddress address = ringCache.getEndpoints(range).get(0);
+ final InetAddress address = ringCache.getEndpoints(range).get(0);
RangeClient client = clients.get(address);
if (client == null)
{
@@ -325,8 +313,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
//There are too many ways for the Thread.interrupted() state to be cleared, so
//we can't rely on that here. Until the java driver gives us a better way of knowing
//that this exception came from an InterruptedException, this is the best solution.
- if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted")) {
- lastException = new IOException(e);
+ if (canRetryDriverConnection(e))
+ {
iter.previous();
}
closeInternal();
@@ -417,7 +405,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
throw lastException;
}
-
protected void closeInternal()
{
if (client != null)
@@ -425,6 +412,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
client.close();;
}
}
+
+ private boolean canRetryDriverConnection(Exception e)
+ {
+ if (e instanceof DriverException && e.getMessage().contains("Connection thread interrupted"))
+ return true;
+ if (e instanceof NoHostAvailableException)
+ {
+ if (((NoHostAvailableException) e).getErrors().values().size() == 1)
+ {
+ Throwable cause = ((NoHostAvailableException) e).getErrors().values().iterator().next();
+ if (cause != null && cause.getCause() instanceof java.nio.channels.ClosedByInterruptException)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)