You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/06/15 21:35:50 UTC
[2/3] cassandra git commit: Reorder operations in CqlRecordWriter
main run loop
Reorder operations in CqlRecordWriter main run loop
Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9576
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17d43fa5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17d43fa5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17d43fa5
Branch: refs/heads/trunk
Commit: 17d43fa55eca29be492a716f04d9ceff1989762d
Parents: 2e92cf8
Author: Philip Thompson <pt...@gmail.com>
Authored: Mon Jun 15 11:55:04 2015 -0400
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Jun 15 19:57:08 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 61 ++++++++++----------
2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 020cb46..ba8ef12 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Fix connection leak in CqlRecordWriter (CASSANDRA-9576)
* Mlockall before opening system sstables & remove boot_without_jna option (CASSANDRA-9573)
* Add functions to convert timeuuid to date or time, deprecate dateOf and unixTimestampOf (CASSANDRA-9229)
* Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 78b0494..6e8ffd9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -299,36 +299,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
while (true)
{
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
-
- // attempt to connect to a different endpoint
- try
- {
- InetAddress address = iter.next();
- String host = address.getHostName();
- client = CqlConfigHelper.getOutputCluster(host, conf).connect();
- }
- catch (Exception e)
- {
- //If connection died due to Interrupt, just try connecting to the endpoint again.
- //There are too many ways for the Thread.interrupted() state to be cleared, so
- //we can't rely on that here. Until the java driver gives us a better way of knowing
- //that this exception came from an InterruptedException, this is the best solution.
- if (canRetryDriverConnection(e))
- {
- iter.previous();
- }
- closeInternal();
-
- // Most exceptions mean something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
- }
- continue;
- }
-
try
{
int i = 0;
@@ -342,7 +312,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
}
client.execute(boundStatement);
i++;
-
+
if (i >= batchThreshold)
break;
bindVariables = queue.poll();
@@ -359,6 +329,33 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
}
}
+ // attempt to connect to a different endpoint
+ try
+ {
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ client = CqlConfigHelper.getOutputCluster(host, conf).connect();
+ }
+ catch (Exception e)
+ {
+ //If connection died due to Interrupt, just try connecting to the endpoint again.
+ //There are too many ways for the Thread.interrupted() state to be cleared, so
+ //we can't rely on that here. Until the java driver gives us a better way of knowing
+ //that this exception came from an InterruptedException, this is the best solution.
+ if (canRetryDriverConnection(e))
+ {
+ iter.previous();
+ }
+ closeInternal();
+
+ // Most exceptions mean something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
}
}
// close all our connections once we are done.
@@ -409,7 +406,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
{
if (client != null)
{
- client.close();;
+ client.close();
}
}