You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/06/15 21:35:50 UTC

[2/3] cassandra git commit: Reorder operations in CqlRecordWriter main run loop

Reorder operations in CqlRecordWriter main run loop

Patch by Philip Thompson; reviewed by Sam Tunnicliffe for CASSANDRA-9576


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17d43fa5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17d43fa5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17d43fa5

Branch: refs/heads/trunk
Commit: 17d43fa55eca29be492a716f04d9ceff1989762d
Parents: 2e92cf8
Author: Philip Thompson <pt...@gmail.com>
Authored: Mon Jun 15 11:55:04 2015 -0400
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Jun 15 19:57:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 61 ++++++++++----------
 2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 020cb46..ba8ef12 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2
+ * Fix connection leak in CqlRecordWriter (CASSANDRA-9576)
  * Mlockall before opening system sstables & remove boot_without_jna option (CASSANDRA-9573)
  * Add functions to convert timeuuid to date or time, deprecate dateOf and unixTimestampOf (CASSANDRA-9229)
  * Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17d43fa5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 78b0494..6e8ffd9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -299,36 +299,6 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                 while (true)
                 {
                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-
-                    // attempt to connect to a different endpoint
-                    try
-                    {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        client = CqlConfigHelper.getOutputCluster(host, conf).connect();
-                    }
-                    catch (Exception e)
-                    {
-                        //If connection died due to Interrupt, just try connecting to the endpoint again.
-                        //There are too many ways for the Thread.interrupted() state to be cleared, so
-                        //we can't rely on that here. Until the java driver gives us a better way of knowing
-                        //that this exception came from an InterruptedException, this is the best solution.
-                        if (canRetryDriverConnection(e))
-                        {
-                            iter.previous();
-                        }
-                        closeInternal();
-
-                        // Most exceptions mean something unexpected went wrong to that endpoint, so
-                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
-                        if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
-                        {
-                            lastException = new IOException(e);
-                            break outer;
-                        }
-                        continue;
-                    }
-
                     try
                     {
                         int i = 0;
@@ -342,7 +312,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                             }
                             client.execute(boundStatement);
                             i++;
-                            
+
                             if (i >= batchThreshold)
                                 break;
                             bindVariables = queue.poll();
@@ -359,6 +329,33 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                         }
                     }
 
+                    // attempt to connect to a different endpoint
+                    try
+                    {
+                        InetAddress address = iter.next();
+                        String host = address.getHostName();
+                        client = CqlConfigHelper.getOutputCluster(host, conf).connect();
+                    }
+                    catch (Exception e)
+                    {
+                        //If connection died due to Interrupt, just try connecting to the endpoint again.
+                        //There are too many ways for the Thread.interrupted() state to be cleared, so
+                        //we can't rely on that here. Until the java driver gives us a better way of knowing
+                        //that this exception came from an InterruptedException, this is the best solution.
+                        if (canRetryDriverConnection(e))
+                        {
+                            iter.previous();
+                        }
+                        closeInternal();
+
+                        // Most exceptions mean something unexpected went wrong to that endpoint, so
+                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
+                        if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
+                        {
+                            lastException = new IOException(e);
+                            break outer;
+                        }
+                    }
                 }
             }
             // close all our connections once we are done.
@@ -409,7 +406,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         {
             if (client != null)
             {
-                client.close();;
+                client.close();
             }
         }