You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/11/27 11:43:14 UTC
[1/2] cassandra git commit: fix 2.2 eclipse-warnings
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 7b430eee6 -> c7724e6b3
fix 2.2 eclipse-warnings
patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-9800
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fc0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fc0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fc0311
Branch: refs/heads/cassandra-3.0
Commit: f8fc0311b65b3d82737352f3d01483c0334a6867
Parents: 61e0251
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Fri Nov 27 11:40:16 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 11:40:16 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/cache/AutoSavingCache.java | 1 +
.../db/WindowsFailedSnapshotTracker.java | 41 ++---
.../db/commitlog/CommitLogReplayer.java | 3 +-
.../hadoop/AbstractColumnFamilyInputFormat.java | 4 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 160 +++++++++++--------
.../cassandra/hadoop/pig/CqlNativeStorage.java | 6 +-
.../io/util/ChecksummedRandomAccessReader.java | 29 +++-
.../apache/cassandra/io/util/SegmentedFile.java | 1 +
.../cassandra/net/IncomingTcpConnection.java | 3 +-
9 files changed, 149 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index c08925d..2c6820e 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -318,6 +318,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
}
+ @SuppressWarnings("resource")
public void saveCache()
{
logger.trace("Deleting old {} files.", cacheType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
index 9e6bb47..7cc7893 100644
--- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
+++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
@@ -52,32 +52,33 @@ public class WindowsFailedSnapshotTracker
{
try
{
- BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE));
- String snapshotDirectory;
- while ((snapshotDirectory = reader.readLine()) != null)
+ try (BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE)))
{
- File f = new File(snapshotDirectory);
+ String snapshotDirectory;
+ while ((snapshotDirectory = reader.readLine()) != null)
+ {
+ File f = new File(snapshotDirectory);
- // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
- // delete something important by virtue of adding something invalid to the .toDelete file.
- boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
- for (String s : DatabaseDescriptor.getAllDataFileLocations())
- validFolder |= FileUtils.isSubDirectory(new File(s), f);
+ // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
+ // delete something important by virtue of adding something invalid to the .toDelete file.
+ boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
+ for (String s : DatabaseDescriptor.getAllDataFileLocations())
+ validFolder |= FileUtils.isSubDirectory(new File(s), f);
- if (!validFolder)
- {
- logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
- continue;
- }
+ if (!validFolder)
+ {
+ logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
+ continue;
+ }
- // Could be a non-existent directory if deletion worked on previous JVM shutdown.
- if (f.exists())
- {
- logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
- FileUtils.deleteRecursive(new File(snapshotDirectory));
+ // Could be a non-existent directory if deletion worked on previous JVM shutdown.
+ if (f.exists())
+ {
+ logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
+ FileUtils.deleteRecursive(new File(snapshotDirectory));
+ }
}
}
- reader.close();
// Only delete the old .toDelete file if we succeed in deleting all our known bad snapshots.
Files.delete(Paths.get(TODELETEFILE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index cb02a8c..98fb556 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -197,7 +197,7 @@ public class CommitLogReplayer
}
return end;
}
-
+
abstract static class ReplayFilter
{
public abstract Iterable<ColumnFamily> filter(Mutation mutation);
@@ -273,6 +273,7 @@ public class CommitLogReplayer
}
}
+ @SuppressWarnings("resource")
public void recover(File file, boolean tolerateTruncation) throws IOException
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 148c08a..3c088c2 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -117,9 +117,9 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
}
}
- try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf))
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+ Session session = cluster.connect())
{
- Session session = cluster.connect();
Metadata metadata = session.getCluster().getMetadata();
for (TokenRange range : masterRangeNodes.keySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 14e24fb..84102a5 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Progressable;
/**
* The <code>CqlRecordWriter</code> maps the output <key, value>
* pairs to a Cassandra table. In particular, it applies the binded variables
- * in the value to the prepared statement, which it associates with the key, and in
+ * in the value to the prepared statement, which it associates with the key, and in
* turn the responsible endpoint.
*
* <p>
@@ -112,11 +112,11 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+ Session client = cluster.connect(keyspace))
{
- String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session client = cluster.connect(keyspace);
ringCache = new NativeRingCache(conf);
if (client != null)
{
@@ -179,7 +179,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
if (clientException != null)
throw clientException;
}
-
+
/**
* If the key is to be associated with a valid value, a mutation is created
* for it with the given table and columns. In the event the value
@@ -225,6 +225,20 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
HadoopCompat.progress(context);
}
+ private static void closeSession(Session session)
+ {
+ //Close the session to satisfy to avoid warnings for the resource not being closed
+ try
+ {
+ if (session != null)
+ session.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Error closing connection", t);
+ }
+ }
+
/**
* A client that runs in a threadpool and connects to the list of endpoints for a particular
* range. Bound variables for keys in that range are sent to this client via a queue.
@@ -273,94 +287,104 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
}
}
}
-
+
/**
* Loops collecting cql binded variable values from the queue and sending to Cassandra
*/
+ @SuppressWarnings("resource")
public void run()
{
Session session = null;
- outer:
- while (run || !queue.isEmpty())
+ try
{
- List<ByteBuffer> bindVariables;
- try
+ outer:
+ while (run || !queue.isEmpty())
{
- bindVariables = queue.take();
- }
- catch (InterruptedException e)
- {
- // re-check loop condition after interrupt
- continue;
- }
+ List<ByteBuffer> bindVariables;
+ try
+ {
+ bindVariables = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ // re-check loop condition after interrupt
+ continue;
+ }
- ListIterator<InetAddress> iter = endpoints.listIterator();
- while (true)
- {
- // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
- if (session != null)
+ ListIterator<InetAddress> iter = endpoints.listIterator();
+ while (true)
{
- try
+ // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+ if (session != null)
{
- int i = 0;
- PreparedStatement statement = preparedStatement(session);
- while (bindVariables != null)
+ try
{
- BoundStatement boundStatement = new BoundStatement(statement);
- for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ int i = 0;
+ PreparedStatement statement = preparedStatement(session);
+ while (bindVariables != null)
{
- boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ session.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
}
- session.execute(boundStatement);
- i++;
-
- if (i >= batchThreshold)
- break;
- bindVariables = queue.poll();
+ break;
}
- break;
+ catch (Exception e)
+ {
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+ }
+
+ // attempt to connect to a different endpoint
+ try
+ {
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ cluster = CqlConfigHelper.getOutputCluster(host, conf);
+ closeSession(session);
+ session = cluster.connect();
}
catch (Exception e)
{
+ //If connection died due to Interrupt, just try connecting to the endpoint again.
+ //There are too many ways for the Thread.interrupted() state to be cleared, so
+ //we can't rely on that here. Until the java driver gives us a better way of knowing
+ //that this exception came from an InterruptedException, this is the best solution.
+ if (canRetryDriverConnection(e))
+ {
+ iter.previous();
+ }
closeInternal();
- if (!iter.hasNext())
+
+ // Most exceptions mean something unexpected went wrong to that endpoint, so
+ // we should try again to another. Other exceptions (auth or invalid request) are fatal.
+ if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
{
lastException = new IOException(e);
break outer;
}
- }
- }
-
- // attempt to connect to a different endpoint
- try
- {
- InetAddress address = iter.next();
- String host = address.getHostName();
- cluster = CqlConfigHelper.getOutputCluster(host, conf);
- session = cluster.connect();
- }
- catch (Exception e)
- {
- //If connection died due to Interrupt, just try connecting to the endpoint again.
- //There are too many ways for the Thread.interrupted() state to be cleared, so
- //we can't rely on that here. Until the java driver gives us a better way of knowing
- //that this exception came from an InterruptedException, this is the best solution.
- if (canRetryDriverConnection(e))
- {
- iter.previous();
- }
- closeInternal();
-
- // Most exceptions mean something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
}
}
}
}
+ finally
+ {
+ closeSession(session);
+ }
+
// close all our connections once we are done.
closeInternal();
}
@@ -489,9 +513,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
private void refreshEndpointMap()
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+ Session session = cluster.connect(keyspace))
{
- Session session = cluster.connect(keyspace);
rangeMap = new HashMap<>();
metadata = session.getCluster().getMetadata();
Set<TokenRange> ranges = metadata.getTokenRanges();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 74058b1..8831cf2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -690,7 +690,7 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
else
throw new IOException("bulk_insert_statement is missing in input url parameter");
if (bulkTableAlias != null)
- CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family);
+ CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family);
CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
if (bulkOutputLocation != null)
conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
@@ -724,9 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
// Only get the schema if we haven't already gotten it
if (!properties.containsKey(signature))
{
- try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf))
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf);
+ Session client = cluster.connect())
{
- Session client = cluster.connect();
client.execute("USE " + keyspace);
// compose the CfDef for the columfamily
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 442236d..9015b61 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -48,15 +48,36 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
this.file = file;
}
+ @SuppressWarnings("resource")
public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
{
try (ChannelProxy channel = new ChannelProxy(file))
{
RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
- @SuppressWarnings("resource")
- DataIntegrityMetadata.ChecksumValidator validator =
- new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath());
- return new ChecksummedRandomAccessReader(file, channel, validator);
+ boolean closeCrcReader = true;
+ try
+ {
+ DataIntegrityMetadata.ChecksumValidator validator =
+ new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath());
+ closeCrcReader = false;
+ boolean closeValidator = true;
+ try
+ {
+ ChecksummedRandomAccessReader retval = new ChecksummedRandomAccessReader(file, channel, validator);
+ closeValidator = false;
+ return retval;
+ }
+ finally
+ {
+ if (closeValidator)
+ validator.close();
+ }
+ }
+ finally
+ {
+ if (closeCrcReader)
+ crcReader.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 30707d8..553cc0d 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -179,6 +179,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
return complete(path, -1L);
}
+ @SuppressWarnings("resource")
public SegmentedFile complete(String path, long overrideLength)
{
ChannelProxy channelCopy = getChannel(path);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index f6652b0..a972114 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -108,7 +108,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
close();
}
}
-
+
@Override
public void close()
{
@@ -164,6 +164,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
}
else
{
+ @SuppressWarnings("resource")
ReadableByteChannel channel = socket.getChannel();
in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()), BUFFER_SIZE);
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by sn...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7724e6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7724e6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7724e6b
Branch: refs/heads/cassandra-3.0
Commit: c7724e6b356ed0f3cee1236db52ec2ee425f2495
Parents: 7b430ee f8fc031
Author: Robert Stupp <sn...@snazy.de>
Authored: Fri Nov 27 11:41:21 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 11:41:21 2015 +0100
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 160 +++++++++++--------
1 file changed, 92 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7724e6b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 23beba3,84102a5..96815ef
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@@ -108,14 -109,14 +108,14 @@@ class CqlRecordWriter extends RecordWri
CqlRecordWriter(Configuration conf)
{
this.conf = conf;
- this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
- batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+ this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
+ String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf);
+ Session client = cluster.connect(keyspace))
{
- String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session client = cluster.connect(keyspace);
ringCache = new NativeRingCache(conf);
if (client != null)
{
@@@ -279,45 -295,67 +294,68 @@@
public void run()
{
Session session = null;
- outer:
- while (run || !queue.isEmpty())
++
+ try
{
- List<ByteBuffer> bindVariables;
- try
+ outer:
+ while (run || !queue.isEmpty())
{
- bindVariables = queue.take();
- }
- catch (InterruptedException e)
- {
- // re-check loop condition after interrupt
- continue;
- }
+ List<ByteBuffer> bindVariables;
+ try
+ {
+ bindVariables = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ // re-check loop condition after interrupt
+ continue;
+ }
- ListIterator<InetAddress> iter = endpoints.listIterator();
- while (true)
- {
- // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
- if (session != null)
+ ListIterator<InetAddress> iter = endpoints.listIterator();
+ while (true)
{
- try
+ // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
+ if (session != null)
{
- int i = 0;
- PreparedStatement statement = preparedStatement(session);
- while (bindVariables != null)
+ try
{
- BoundStatement boundStatement = new BoundStatement(statement);
- for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ int i = 0;
+ PreparedStatement statement = preparedStatement(session);
+ while (bindVariables != null)
{
- boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ session.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
}
- session.execute(boundStatement);
- i++;
-
- if (i >= batchThreshold)
- break;
- bindVariables = queue.poll();
+ break;
}
- break;
+ catch (Exception e)
+ {
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
+ }
+ }
+
+ // attempt to connect to a different endpoint
+ try
+ {
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ cluster = CqlConfigHelper.getOutputCluster(host, conf);
+ closeSession(session);
+ session = cluster.connect();
}
catch (Exception e)
{
@@@ -329,37 -378,13 +378,12 @@@
}
}
}
-
- // attempt to connect to a different endpoint
- try
- {
- InetAddress address = iter.next();
- String host = address.getHostName();
- cluster = CqlConfigHelper.getOutputCluster(host, conf);
- session = cluster.connect();
- }
- catch (Exception e)
- {
- //If connection died due to Interrupt, just try connecting to the endpoint again.
- //There are too many ways for the Thread.interrupted() state to be cleared, so
- //we can't rely on that here. Until the java driver gives us a better way of knowing
- //that this exception came from an InterruptedException, this is the best solution.
- if (canRetryDriverConnection(e))
- {
- iter.previous();
- }
- closeInternal();
-
- // Most exceptions mean something unexpected went wrong to that endpoint, so
- // we should try again to another. Other exceptions (auth or invalid request) are fatal.
- if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
- }
- }
}
}
+ finally
+ {
+ closeSession(session);
+ }
-
// close all our connections once we are done.
closeInternal();
}