You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/11/27 11:43:14 UTC

[1/2] cassandra git commit: fix 2.2 eclipse-warnings

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 7b430eee6 -> c7724e6b3


fix 2.2 eclipse-warnings

patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-9800


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fc0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fc0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fc0311

Branch: refs/heads/cassandra-3.0
Commit: f8fc0311b65b3d82737352f3d01483c0334a6867
Parents: 61e0251
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Fri Nov 27 11:40:16 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 11:40:16 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java |   1 +
 .../db/WindowsFailedSnapshotTracker.java        |  41 ++---
 .../db/commitlog/CommitLogReplayer.java         |   3 +-
 .../hadoop/AbstractColumnFamilyInputFormat.java |   4 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 160 +++++++++++--------
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |   6 +-
 .../io/util/ChecksummedRandomAccessReader.java  |  29 +++-
 .../apache/cassandra/io/util/SegmentedFile.java |   1 +
 .../cassandra/net/IncomingTcpConnection.java    |   3 +-
 9 files changed, 149 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index c08925d..2c6820e 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -318,6 +318,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
         }
 
+        @SuppressWarnings("resource")
         public void saveCache()
         {
             logger.trace("Deleting old {} files.", cacheType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
index 9e6bb47..7cc7893 100644
--- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
+++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
@@ -52,32 +52,33 @@ public class WindowsFailedSnapshotTracker
         {
             try
             {
-                BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE));
-                String snapshotDirectory;
-                while ((snapshotDirectory = reader.readLine()) != null)
+                try (BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE)))
                 {
-                    File f = new File(snapshotDirectory);
+                    String snapshotDirectory;
+                    while ((snapshotDirectory = reader.readLine()) != null)
+                    {
+                        File f = new File(snapshotDirectory);
 
-                    // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
-                    // delete something important by virtue of adding something invalid to the .toDelete file.
-                    boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
-                    for (String s : DatabaseDescriptor.getAllDataFileLocations())
-                        validFolder |= FileUtils.isSubDirectory(new File(s), f);
+                        // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
+                        // delete something important by virtue of adding something invalid to the .toDelete file.
+                        boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
+                        for (String s : DatabaseDescriptor.getAllDataFileLocations())
+                            validFolder |= FileUtils.isSubDirectory(new File(s), f);
 
-                    if (!validFolder)
-                    {
-                        logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
-                        continue;
-                    }
+                        if (!validFolder)
+                        {
+                            logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
+                            continue;
+                        }
 
-                    // Could be a non-existent directory if deletion worked on previous JVM shutdown.
-                    if (f.exists())
-                    {
-                        logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
-                        FileUtils.deleteRecursive(new File(snapshotDirectory));
+                        // Could be a non-existent directory if deletion worked on previous JVM shutdown.
+                        if (f.exists())
+                        {
+                            logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
+                            FileUtils.deleteRecursive(new File(snapshotDirectory));
+                        }
                     }
                 }
-                reader.close();
 
                 // Only delete the old .toDelete file if we succeed in deleting all our known bad snapshots.
                 Files.delete(Paths.get(TODELETEFILE));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index cb02a8c..98fb556 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -197,7 +197,7 @@ public class CommitLogReplayer
         }
         return end;
     }
-    
+
     abstract static class ReplayFilter
     {
         public abstract Iterable<ColumnFamily> filter(Mutation mutation);
@@ -273,6 +273,7 @@ public class CommitLogReplayer
         }
     }
 
+    @SuppressWarnings("resource")
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 148c08a..3c088c2 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -117,9 +117,9 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
             }
         }
 
-        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+             Session session = cluster.connect())
         {
-            Session session = cluster.connect();
             Metadata metadata = session.getCluster().getMetadata();
 
             for (TokenRange range : masterRangeNodes.keySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 14e24fb..84102a5 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Progressable;
 /**
  * The <code>CqlRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra table. In particular, it applies the binded variables
- * in the value to the prepared statement, which it associates with the key, and in 
+ * in the value to the prepared statement, which it associates with the key, and in
  * turn the responsible endpoint.
  *
  * <p>
@@ -112,11 +112,11 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
         batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
+        String keyspace = ConfigHelper.getOutputKeyspace(conf);
 
-        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+             Session client = cluster.connect(keyspace))
         {
-            String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            Session client = cluster.connect(keyspace);
             ringCache = new NativeRingCache(conf);
             if (client != null)
             {
@@ -179,7 +179,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         if (clientException != null)
             throw clientException;
     }
-    
+
     /**
      * If the key is to be associated with a valid value, a mutation is created
      * for it with the given table and columns. In the event the value
@@ -225,6 +225,20 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
             HadoopCompat.progress(context);
     }
 
+    private static void closeSession(Session session)
+    {
+        //Close the session to satisfy to avoid warnings for the resource not being closed
+        try
+        {
+            if (session != null)
+                session.close();
+        }
+        catch (Throwable t)
+        {
+            logger.warn("Error closing connection", t);
+        }
+    }
+
     /**
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Bound variables for keys in that range are sent to this client via a queue.
@@ -273,94 +287,104 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
                 }
             }
         }
-        
+
         /**
          * Loops collecting cql binded variable values from the queue and sending to Cassandra
          */
+        @SuppressWarnings("resource")
         public void run()
         {
             Session session = null;
-            outer:
-            while (run || !queue.isEmpty())
+            try
             {
-                List<ByteBuffer> bindVariables;
-                try
+                outer:
+                while (run || !queue.isEmpty())
                 {
-                    bindVariables = queue.take();
-                }
-                catch (InterruptedException e)
-                {
-                    // re-check loop condition after interrupt
-                    continue;
-                }
+                    List<ByteBuffer> bindVariables;
+                    try
+                    {
+                        bindVariables = queue.take();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // re-check loop condition after interrupt
+                        continue;
+                    }
 
-                ListIterator<InetAddress> iter = endpoints.listIterator();
-                while (true)
-                {
-                    // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-                    if (session != null)
+                    ListIterator<InetAddress> iter = endpoints.listIterator();
+                    while (true)
                     {
-                        try
+                        // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
+                        if (session != null)
                         {
-                            int i = 0;
-                            PreparedStatement statement = preparedStatement(session);
-                            while (bindVariables != null)
+                            try
                             {
-                                BoundStatement boundStatement = new BoundStatement(statement);
-                                for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                int i = 0;
+                                PreparedStatement statement = preparedStatement(session);
+                                while (bindVariables != null)
                                 {
-                                    boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                    BoundStatement boundStatement = new BoundStatement(statement);
+                                    for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                    {
+                                        boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                    }
+                                    session.execute(boundStatement);
+                                    i++;
+
+                                    if (i >= batchThreshold)
+                                        break;
+                                    bindVariables = queue.poll();
                                 }
-                                session.execute(boundStatement);
-                                i++;
-
-                                if (i >= batchThreshold)
-                                    break;
-                                bindVariables = queue.poll();
+                                break;
                             }
-                            break;
+                            catch (Exception e)
+                            {
+                                closeInternal();
+                                if (!iter.hasNext())
+                                {
+                                    lastException = new IOException(e);
+                                    break outer;
+                                }
+                            }
+                        }
+
+                        // attempt to connect to a different endpoint
+                        try
+                        {
+                            InetAddress address = iter.next();
+                            String host = address.getHostName();
+                            cluster = CqlConfigHelper.getOutputCluster(host, conf);
+                            closeSession(session);
+                            session = cluster.connect();
                         }
                         catch (Exception e)
                         {
+                            //If connection died due to Interrupt, just try connecting to the endpoint again.
+                            //There are too many ways for the Thread.interrupted() state to be cleared, so
+                            //we can't rely on that here. Until the java driver gives us a better way of knowing
+                            //that this exception came from an InterruptedException, this is the best solution.
+                            if (canRetryDriverConnection(e))
+                            {
+                                iter.previous();
+                            }
                             closeInternal();
-                            if (!iter.hasNext())
+
+                            // Most exceptions mean something unexpected went wrong to that endpoint, so
+                            // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
+                            if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
                             {
                                 lastException = new IOException(e);
                                 break outer;
                             }
-                        }                        
-                    }
-
-                    // attempt to connect to a different endpoint
-                    try
-                    {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        cluster = CqlConfigHelper.getOutputCluster(host, conf);
-                        session = cluster.connect();
-                    }
-                    catch (Exception e)
-                    {
-                        //If connection died due to Interrupt, just try connecting to the endpoint again.
-                        //There are too many ways for the Thread.interrupted() state to be cleared, so
-                        //we can't rely on that here. Until the java driver gives us a better way of knowing
-                        //that this exception came from an InterruptedException, this is the best solution.
-                        if (canRetryDriverConnection(e))
-                        {
-                            iter.previous();
-                        }
-                        closeInternal();
-
-                        // Most exceptions mean something unexpected went wrong to that endpoint, so
-                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
-                        if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
-                        {
-                            lastException = new IOException(e);
-                            break outer;
                         }
                     }
                 }
             }
+            finally
+            {
+                closeSession(session);
+            }
+
             // close all our connections once we are done.
             closeInternal();
         }
@@ -489,9 +513,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
         private void refreshEndpointMap()
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+                 Session session = cluster.connect(keyspace))
             {
-                Session session = cluster.connect(keyspace);
                 rangeMap = new HashMap<>();
                 metadata = session.getCluster().getMetadata();
                 Set<TokenRange> ranges = metadata.getTokenRanges();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 74058b1..8831cf2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -690,7 +690,7 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
             else
                 throw new IOException("bulk_insert_statement is missing in input url parameter");
             if (bulkTableAlias != null)
-                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); 
+                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family);
             CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
             if (bulkOutputLocation != null)
                 conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
@@ -724,9 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
         // Only get the schema if we haven't already gotten it
         if (!properties.containsKey(signature))
         {
-            try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf))
+            try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf);
+                 Session client = cluster.connect())
             {
-                Session client = cluster.connect();
                 client.execute("USE " + keyspace);
 
                 // compose the CfDef for the columfamily

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 442236d..9015b61 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -48,15 +48,36 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
         this.file = file;
     }
 
+    @SuppressWarnings("resource")
     public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
     {
         try (ChannelProxy channel = new ChannelProxy(file))
         {
             RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
-            @SuppressWarnings("resource")
-            DataIntegrityMetadata.ChecksumValidator validator =
-                new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath());
-            return new ChecksummedRandomAccessReader(file, channel, validator);
+            boolean closeCrcReader = true;
+            try
+            {
+                DataIntegrityMetadata.ChecksumValidator validator =
+                        new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath());
+                closeCrcReader = false;
+                boolean closeValidator = true;
+                try
+                {
+                    ChecksummedRandomAccessReader retval = new ChecksummedRandomAccessReader(file, channel, validator);
+                    closeValidator = false;
+                    return retval;
+                }
+                finally
+                {
+                    if (closeValidator)
+                        validator.close();
+                }
+            }
+            finally
+            {
+                if (closeCrcReader)
+                    crcReader.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 30707d8..553cc0d 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -179,6 +179,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
             return complete(path, -1L);
         }
 
+        @SuppressWarnings("resource")
         public SegmentedFile complete(String path, long overrideLength)
         {
             ChannelProxy channelCopy = getChannel(path);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index f6652b0..a972114 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -108,7 +108,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
             close();
         }
     }
-    
+
     @Override
     public void close()
     {
@@ -164,6 +164,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
         else
         {
+            @SuppressWarnings("resource")
             ReadableByteChannel channel = socket.getChannel();
             in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()), BUFFER_SIZE);
         }


[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by sn...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7724e6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7724e6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7724e6b

Branch: refs/heads/cassandra-3.0
Commit: c7724e6b356ed0f3cee1236db52ec2ee425f2495
Parents: 7b430ee f8fc031
Author: Robert Stupp <sn...@snazy.de>
Authored: Fri Nov 27 11:41:21 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 11:41:21 2015 +0100

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 160 +++++++++++--------
 1 file changed, 92 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7724e6b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 23beba3,84102a5..96815ef
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@@ -108,14 -109,14 +108,14 @@@ class CqlRecordWriter extends RecordWri
      CqlRecordWriter(Configuration conf)
      {
          this.conf = conf;
 -        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
 -        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
 +        this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
 +        batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
          this.clients = new HashMap<>();
+         String keyspace = ConfigHelper.getOutputKeyspace(conf);
  
-         try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+         try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+              Session client = cluster.connect(keyspace))
          {
-             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-             Session client = cluster.connect(keyspace);
              ringCache = new NativeRingCache(conf);
              if (client != null)
              {
@@@ -279,45 -295,67 +294,68 @@@
          public void run()
          {
              Session session = null;
-             outer:
-             while (run || !queue.isEmpty())
++
+             try
              {
-                 List<ByteBuffer> bindVariables;
-                 try
+                 outer:
+                 while (run || !queue.isEmpty())
                  {
-                     bindVariables = queue.take();
-                 }
-                 catch (InterruptedException e)
-                 {
-                     // re-check loop condition after interrupt
-                     continue;
-                 }
+                     List<ByteBuffer> bindVariables;
+                     try
+                     {
+                         bindVariables = queue.take();
+                     }
+                     catch (InterruptedException e)
+                     {
+                         // re-check loop condition after interrupt
+                         continue;
+                     }
  
-                 ListIterator<InetAddress> iter = endpoints.listIterator();
-                 while (true)
-                 {
-                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-                     if (session != null)
+                     ListIterator<InetAddress> iter = endpoints.listIterator();
+                     while (true)
                      {
-                         try
+                         // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
+                         if (session != null)
                          {
-                             int i = 0;
-                             PreparedStatement statement = preparedStatement(session);
-                             while (bindVariables != null)
+                             try
                              {
-                                 BoundStatement boundStatement = new BoundStatement(statement);
-                                 for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                 int i = 0;
+                                 PreparedStatement statement = preparedStatement(session);
+                                 while (bindVariables != null)
                                  {
-                                     boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                     BoundStatement boundStatement = new BoundStatement(statement);
+                                     for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                     {
+                                         boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                     }
+                                     session.execute(boundStatement);
+                                     i++;
+ 
+                                     if (i >= batchThreshold)
+                                         break;
+                                     bindVariables = queue.poll();
                                  }
-                                 session.execute(boundStatement);
-                                 i++;
- 
-                                 if (i >= batchThreshold)
-                                     break;
-                                 bindVariables = queue.poll();
+                                 break;
                              }
-                             break;
+                             catch (Exception e)
+                             {
+                                 closeInternal();
+                                 if (!iter.hasNext())
+                                 {
+                                     lastException = new IOException(e);
+                                     break outer;
+                                 }
+                             }
+                         }
+ 
+                         // attempt to connect to a different endpoint
+                         try
+                         {
+                             InetAddress address = iter.next();
+                             String host = address.getHostName();
+                             cluster = CqlConfigHelper.getOutputCluster(host, conf);
+                             closeSession(session);
+                             session = cluster.connect();
                          }
                          catch (Exception e)
                          {
@@@ -329,37 -378,13 +378,12 @@@
                              }
                          }
                      }
- 
-                     // attempt to connect to a different endpoint
-                     try
-                     {
-                         InetAddress address = iter.next();
-                         String host = address.getHostName();
-                         cluster = CqlConfigHelper.getOutputCluster(host, conf);
-                         session = cluster.connect();
-                     }
-                     catch (Exception e)
-                     {
-                         //If connection died due to Interrupt, just try connecting to the endpoint again.
-                         //There are too many ways for the Thread.interrupted() state to be cleared, so
-                         //we can't rely on that here. Until the java driver gives us a better way of knowing
-                         //that this exception came from an InterruptedException, this is the best solution.
-                         if (canRetryDriverConnection(e))
-                         {
-                             iter.previous();
-                         }
-                         closeInternal();
- 
-                         // Most exceptions mean something unexpected went wrong to that endpoint, so
-                         // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
-                         if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
-                         {
-                             lastException = new IOException(e);
-                             break outer;
-                         }
-                     }
                  }
              }
+             finally
+             {
+                 closeSession(session);
+             }
 -
              // close all our connections once we are done.
              closeInternal();
          }