You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/28 00:00:54 UTC
[2/4] cassandra git commit: Add Static Analysis to warn on unsafe use
of Autocloseable instances
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 2c04475..4dd53ff 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -241,13 +241,14 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
-
- Map<TokenRange, Set<Host>> map = new HashMap<>();
- Metadata metadata = session.getCluster().getMetadata();
- for (TokenRange tokenRange : metadata.getTokenRanges())
- map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
- return map;
+ try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+ {
+ Map<TokenRange, Set<Host>> map = new HashMap<>();
+ Metadata metadata = session.getCluster().getMetadata();
+ for (TokenRange tokenRange : metadata.getTokenRanges())
+ map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
+ return map;
+ }
}
private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 88dd2e2..f89825f 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -61,6 +61,7 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+ @SuppressWarnings("resource")
public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF input format");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 94ced69..92e3829 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -114,6 +114,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
* @throws Exception set of thrown exceptions may be implementation defined,
* depending on the used transport factory
*/
+ @SuppressWarnings("resource")
public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF output format");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index d205f13..c103d75 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -82,6 +82,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
this.keyBufferSize = keyBufferSize;
}
+ @SuppressWarnings("resource")
public void close()
{
if (client != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 31c7047..f06f03d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -249,6 +249,7 @@ final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutat
throw lastException;
}
+ @SuppressWarnings("resource")
protected void closeInternal()
{
if (client != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index b956e23..e81860d 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -543,6 +543,7 @@ public class ConfigHelper
return client;
}
+ @SuppressWarnings("resource")
public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3033fa6..9462724 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -602,13 +602,9 @@ public class CqlConfigHelper
private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword)
throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException
{
- FileInputStream tsf = null;
- FileInputStream ksf = null;
SSLContext ctx = null;
- try
+ try (FileInputStream tsf = new FileInputStream(truststorePath); FileInputStream ksf = new FileInputStream(keystorePath))
{
- tsf = new FileInputStream(truststorePath);
- ksf = new FileInputStream(keystorePath);
ctx = SSLContext.getInstance("SSL");
KeyStore ts = KeyStore.getInstance("JKS");
@@ -623,11 +619,6 @@ public class CqlConfigHelper
ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
}
- finally
- {
- FileUtils.closeQuietly(tsf);
- FileUtils.closeQuietly(ksf);
- }
return ctx;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 91753a2..78b0494 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -116,24 +116,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
try
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
- ringCache = new NativeRingCache(conf);
- if (client != null)
- {
- TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
- clusterColumns = tableMetadata.getClusteringColumns();
- partitionKeyColumns = tableMetadata.getPartitionKey();
-
- String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
- if (cqlQuery.toLowerCase().startsWith("insert"))
- throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
- cql = appendKeyWhereClauses(cqlQuery);
-
- client.close();
- }
- else
+ try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
{
- throw new IllegalArgumentException("Invalid configuration specified " + conf);
+ ringCache = new NativeRingCache(conf);
+ if (client != null)
+ {
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid configuration specified " + conf);
+ }
}
}
catch (Exception e)
@@ -489,13 +489,15 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
private void refreshEndpointMap()
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
- rangeMap = new HashMap<>();
- metadata = session.getCluster().getMetadata();
- Set<TokenRange> ranges = metadata.getTokenRanges();
- for (TokenRange range : ranges)
+ try (Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
{
- rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ rangeMap = new HashMap<>();
+ metadata = session.getCluster().getMetadata();
+ Set<TokenRange> ranges = metadata.getTokenRanges();
+ for (TokenRange range : ranges)
+ {
+ rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 0b833b7..63baf9c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -699,9 +699,8 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
// Only get the schema if we haven't already gotten it
if (!properties.containsKey(signature))
{
- try
+ try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect())
{
- Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect();
client.execute("USE " + keyspace);
// compose the CfDef for the columfamily
@@ -729,9 +728,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
{
TableInfo tableInfo = new TableInfo(cfDef);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream( baos );
- oos.writeObject( tableInfo );
- oos.close();
+ try (ObjectOutputStream oos = new ObjectOutputStream( baos ))
+ {
+ oos.writeObject(tableInfo);
+ }
+
return new String( Base64Coder.encode(baos.toByteArray()) );
}
@@ -739,11 +740,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
{
byte [] data = Base64Coder.decode( st );
- ObjectInputStream ois = new ObjectInputStream(
- new ByteArrayInputStream( data ) );
- Object o = ois.readObject();
- ois.close();
- return (TableInfo)o;
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)))
+ {
+ Object o = ois.readObject();
+ return (TableInfo)o;
+ }
}
/** decompose the query to store the parameters in a map */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c994a3d..23a9f3e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -91,17 +91,7 @@ public class CompressionMetadata
{
this.indexFilePath = indexFilePath;
- DataInputStream stream;
- try
- {
- stream = new DataInputStream(new FileInputStream(indexFilePath));
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
-
- try
+ try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
{
String compressorName = stream.readUTF();
int optionCount = stream.readInt();
@@ -126,13 +116,13 @@ public class CompressionMetadata
compressedFileLength = compressedLength;
chunkOffsets = readChunkOffsets(stream);
}
- catch (IOException e)
+ catch (FileNotFoundException e)
{
- throw new CorruptSSTableException(e, indexFilePath);
+ throw new RuntimeException(e);
}
- finally
+ catch (IOException e)
{
- FileUtils.closeQuietly(stream);
+ throw new CorruptSSTableException(e, indexFilePath);
}
this.chunkOffsetsSize = chunkOffsets.size();
@@ -176,32 +166,42 @@ public class CompressionMetadata
*/
private Memory readChunkOffsets(DataInput input)
{
+ final int chunkCount;
try
{
- int chunkCount = input.readInt();
+ chunkCount = input.readInt();
if (chunkCount <= 0)
throw new IOException("Compressed file with 0 chunks encountered: " + input);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, indexFilePath);
+ }
- Memory offsets = Memory.allocate(chunkCount * 8L);
+ @SuppressWarnings("resource")
+ Memory offsets = Memory.allocate(chunkCount * 8L);
+ int i = 0;
+ try
+ {
- for (int i = 0; i < chunkCount; i++)
+ for (i = 0; i < chunkCount; i++)
{
- try
- {
- offsets.setLong(i * 8L, input.readLong());
- }
- catch (EOFException e)
- {
- String msg = String.format("Corrupted Index File %s: read %d but expected %d chunks.",
- indexFilePath, i, chunkCount);
- throw new CorruptSSTableException(new IOException(msg, e), indexFilePath);
- }
+ offsets.setLong(i * 8L, input.readLong());
}
return offsets;
}
catch (IOException e)
{
+ if (offsets != null)
+ offsets.close();
+
+ if (e instanceof EOFException)
+ {
+ String msg = String.format("Corrupted Index File %s: read %d but expected %d chunks.",
+ indexFilePath, i, chunkCount);
+ throw new CorruptSSTableException(new IOException(msg, e), indexFilePath);
+ }
throw new FSReadError(e, indexFilePath);
}
}
@@ -345,10 +345,8 @@ public class CompressionMetadata
}
// flush the data to disk
- DataOutputStream out = null;
- try
+ try (DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath))))
{
- out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
writeHeader(out, dataLength, count);
for (int i = 0 ; i < count ; i++)
out.writeLong(offsets.getLong(i * 8L));
@@ -357,12 +355,9 @@ public class CompressionMetadata
{
throw Throwables.propagate(e);
}
- finally
- {
- FileUtils.closeQuietly(out);
- }
}
+ @SuppressWarnings("resource")
public CompressionMetadata open(long dataLength, long compressedLength)
{
SafeMemory offsets = this.offsets.sharedCopy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 1389ad2..4181ed0 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -509,6 +509,7 @@ public class CQLSSTableWriter implements Closeable
}
}
+ @SuppressWarnings("resource")
public CQLSSTableWriter build()
{
if (directory == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 59c5eef..7df7349 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -289,6 +289,7 @@ public class IndexSummary extends WrappedSharedCloseable
out.write(t.entries, 0, t.entriesLength);
}
+ @SuppressWarnings("resource")
public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
{
int minIndexInterval = in.readInt();
@@ -321,8 +322,17 @@ public class IndexSummary extends WrappedSharedCloseable
Memory offsets = Memory.allocate(offsetCount * 4);
Memory entries = Memory.allocate(offheapSize - offsets.size());
- FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
- FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+ try
+ {
+ FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+ FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+ }
+ catch (IOException ioe)
+ {
+ offsets.free();
+ entries.free();
+ throw ioe;
+ }
// our on-disk representation treats the offsets and the summary data as one contiguous structure,
// in which the offsets are based from the start of the structure. i.e., if the offsets occupy
// X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 12e41c8..6110afe 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -281,6 +281,7 @@ public class IndexSummaryBuilder implements AutoCloseable
* @param partitioner the partitioner used for the index summary
* @return a new IndexSummary
*/
+ @SuppressWarnings("resource")
public static IndexSummary downsample(IndexSummary existing, int newSamplingLevel, int minIndexInterval, IPartitioner partitioner)
{
// To downsample the old index summary, we'll go through (potentially) several rounds of downsampling.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 9bfbc99..6f66fd3 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -202,6 +202,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
* Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as
* compacting.
*/
+ @SuppressWarnings("resource")
private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables()
{
List<SSTableReader> allCompacting = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index bc3486a..2077152 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -270,10 +270,8 @@ public abstract class SSTable
protected static void appendTOC(Descriptor descriptor, Collection<Component> components)
{
File tocFile = new File(descriptor.filenameFor(Component.TOC));
- PrintWriter w = null;
- try
+ try (PrintWriter w = new PrintWriter(new FileWriter(tocFile, true)))
{
- w = new PrintWriter(new FileWriter(tocFile, true));
for (Component component : components)
w.println(component.name);
}
@@ -281,10 +279,6 @@ public abstract class SSTable
{
throw new FSWriteError(e, tocFile);
}
- finally
- {
- FileUtils.closeQuietly(w);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 15008d2..b99003b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -71,6 +71,7 @@ public class SSTableLoader implements StreamEventHandler
this.connectionsPerHost = connectionsPerHost;
}
+ @SuppressWarnings("resource")
protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
{
outputHandler.output("Opening sstables and calculating sections to stream");
@@ -126,9 +127,7 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
- Ref ref = sstable.tryRef();
- if (ref == null)
- throw new IllegalStateException("Could not acquire ref for "+sstable);
+ Ref<SSTableReader> ref = sstable.ref();
StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 8e701b3..54dff4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -653,16 +653,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
private void loadBloomFilter() throws IOException
{
- DataInputStream stream = null;
- try
+ try (DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))))
{
- stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
bf = FilterFactory.deserialize(stream, true);
}
- finally
- {
- FileUtils.closeQuietly(stream);
- }
}
/**
@@ -725,9 +719,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
{
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
-
- try
+ try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
{
long indexSize = primaryIndex.length();
long histogramCount = sstableMetadata.estimatedRowSize.count();
@@ -768,10 +760,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
indexSummary = summaryBuilder.build(partitioner);
}
}
- finally
- {
- FileUtils.closeQuietly(primaryIndex);
- }
first = getMinimalKey(first);
last = getMinimalKey(last);
@@ -787,6 +775,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param dbuilder
* @return true if index summary is loaded successfully from Summary.db file.
*/
+ @SuppressWarnings("resource")
public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
@@ -841,9 +830,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
int expectedIndexInterval = getMinIndexInterval();
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next())
{
+ path = in.getPath();
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
@@ -864,11 +854,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
@@ -901,10 +887,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
- DataOutputStreamPlus oStream = null;
- try
+ try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
{
- oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));
IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
@@ -919,10 +903,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
}
- finally
- {
- FileUtils.closeQuietly(oStream);
- }
}
public void setReplaced()
@@ -1000,6 +980,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @return a new SSTableReader
* @throws IOException
*/
+ @SuppressWarnings("resource")
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
assert descriptor.version.hasSamplingLevel();
@@ -1479,9 +1460,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next();)
{
+ path = in.getPath();
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
@@ -1495,11 +1477,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
@@ -2027,6 +2005,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
// get a new reference to the shared DescriptorTypeTidy for this sstable
+ @SuppressWarnings("resource")
public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
{
Descriptor desc = sstable.descriptor;
@@ -2038,7 +2017,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
refc = new Ref<>(tidy, tidy);
Ref<?> ex = lookup.putIfAbsent(desc, refc);
- assert ex == null;
+ if (ex != null)
+ {
+ refc.close();
+ throw new AssertionError();
+ }
return refc;
}
}
@@ -2119,6 +2102,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
// get a new reference to the shared GlobalTidy for this sstable
+ @SuppressWarnings("resource")
public static Ref<GlobalTidy> get(SSTableReader sstable)
{
Descriptor descriptor = sstable.descriptor;
@@ -2128,7 +2112,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
final GlobalTidy tidy = new GlobalTidy(sstable);
refc = new Ref<>(tidy, tidy);
Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
- assert ex == null;
+ if (ex != null)
+ {
+ refc.close();
+ throw new AssertionError();
+ }
return refc;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index baf6d51..3f375e7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -177,9 +177,10 @@ public class BigTableReader extends SSTableReader
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next())
{
+ path = in.getPath();
while (!in.isEOF())
{
i++;
@@ -220,11 +221,12 @@ public class BigTableReader extends SSTableReader
if (logger.isTraceEnabled())
{
// expensive sanity check! see CASSANDRA-4687
- FileDataInput fdi = dfile.getSegment(indexEntry.position);
- DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
- if (!keyInDisk.equals(key))
- throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
- fdi.close();
+ try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
+ {
+ DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ if (!keyInDisk.equals(key))
+ throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
+ }
}
// store exact match for the key
@@ -242,11 +244,7 @@ public class BigTableReader extends SSTableReader
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 0f8f0d3..30b55a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -285,6 +285,7 @@ public class BigTableWriter extends SSTableWriter
return link;
}
+ @SuppressWarnings("resource")
public SSTableReader openEarly()
{
// find the max (exclusive) readable key
@@ -318,6 +319,7 @@ public class BigTableWriter extends SSTableWriter
return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
}
+ @SuppressWarnings("resource")
private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
{
if (maxDataAge < 0)
@@ -507,15 +509,13 @@ public class BigTableWriter extends SSTableWriter
if (components.contains(Component.FILTER))
{
String path = descriptor.filenameFor(Component.FILTER);
- try
+ try (FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos))
{
// bloom filter
- FileOutputStream fos = new FileOutputStream(path);
- DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos);
FilterFactory.serialize(bf, stream);
stream.flush();
SyncUtil.sync(fos);
- stream.close();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index c51e595..7c9a344 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@ -95,6 +95,7 @@ class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDis
return fileToClose;
}
+ @SuppressWarnings("resource")
private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
throws IOException
{
@@ -170,6 +171,7 @@ class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDis
}
}
+ @SuppressWarnings("resource")
private void readIndexedColumns(CFMetaData metadata,
FileDataInput file,
SortedSet<CellName> columnNames,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index a3c4135..4362cee 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -200,20 +200,14 @@ public class DataIntegrityMetadata
public void writeFullChecksum(Descriptor descriptor)
{
File outFile = new File(descriptor.filenameFor(Component.DIGEST));
- BufferedWriter out = null;
- try
+ try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
{
- out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8);
out.write(String.valueOf(fullChecksum.getValue()));
}
catch (IOException e)
{
throw new FSWriteError(e, outFile);
}
- finally
- {
- FileUtils.closeQuietly(out);
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index b556587..6ffc895 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -93,7 +93,7 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
}
@Override
- public void close() throws IOException
+ public void close()
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index db9391c..a5fa20b 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -54,6 +54,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
}
}
+ @SuppressWarnings("resource")
public FileDataInput getSegment(long position)
{
RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 302f054..278f55c 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -229,6 +229,10 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
public void deallocate()
{
+ //make idempotent
+ if (buffer == null)
+ return;
+
bufferOffset += buffer.position();
FileUtils.clean(buffer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 1096b5f..aad3266 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.io.util;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -26,6 +25,7 @@ public class SafeMemoryWriter extends DataOutputBuffer
{
private SafeMemory memory;
+ @SuppressWarnings("resource")
public SafeMemoryWriter(long initialCapacity)
{
this(new SafeMemory(initialCapacity));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
index afcd083..88c62e9 100644
--- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
+++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
@@ -174,15 +174,12 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
String csEndpointFromLease(File lease) throws ConfigurationException
{
- BufferedReader reader = null;
-
- String line = null;
+ String line;
String endpoint = null;
Pattern identifierPattern = Pattern.compile("^[ \t]*option dhcp-server-identifier (.*);$");
- try
+ try (BufferedReader reader = new BufferedReader(new FileReader(lease)))
{
- reader = new BufferedReader(new FileReader(lease));
while ((line = reader.readLine()) != null)
{
@@ -194,14 +191,10 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
break;
}
}
- }
+ }
catch (Exception e)
{
throw new ConfigurationException("CloudstackSnitch cannot access lease file.");
- }
- finally
- {
- FileUtils.closeQuietly(reader);
}
if (endpoint == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 247eb00..8665816 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -136,20 +136,14 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
HashMap<InetAddress, String[]> reloadedMap = new HashMap<InetAddress, String[]>();
Properties properties = new Properties();
- InputStream stream = null;
- try
+ try (InputStream stream = getClass().getClassLoader().getResourceAsStream(SNITCH_PROPERTIES_FILENAME))
{
- stream = getClass().getClassLoader().getResourceAsStream(SNITCH_PROPERTIES_FILENAME);
properties.load(stream);
}
catch (Exception e)
{
throw new ConfigurationException("Unable to read " + SNITCH_PROPERTIES_FILENAME, e);
}
- finally
- {
- FileUtils.closeQuietly(stream);
- }
for (Map.Entry<Object, Object> entry : properties.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c54d5ee..293a27c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -436,6 +436,7 @@ public final class MessagingService implements MessagingServiceMBean
listenGate.signalAll();
}
+ @SuppressWarnings("resource")
private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
{
final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
@@ -471,6 +472,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (SocketException e)
{
+ FileUtils.closeQuietly(socket);
throw new ConfigurationException("Insufficient permissions to setReuseAddress", e);
}
InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
@@ -480,6 +482,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (BindException e)
{
+ FileUtils.closeQuietly(socket);
if (e.getMessage().contains("in use"))
throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
else if (e.getMessage().contains("Cannot assign requested address"))
@@ -490,6 +493,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (IOException e)
{
+ FileUtils.closeQuietly(socket);
throw new RuntimeException(e);
}
logger.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
@@ -874,6 +878,7 @@ public final class MessagingService implements MessagingServiceMBean
this.server = server;
}
+ @SuppressWarnings("resource")
public void run()
{
while (!server.isClosed())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 78ef615..0eb8e02 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -366,6 +366,7 @@ public class OutboundTcpConnection extends Thread
}
}
+ @SuppressWarnings("resource")
private boolean connect()
{
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java
index 956ba67..e9aa07d 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -99,6 +99,7 @@ public final class SSLFactory
return socket;
}
+ @SuppressWarnings("resource")
public static SSLContext createSSLContext(EncryptionOptions options, boolean buildTruststore) throws IOException
{
FileInputStream tsf = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index d350f4e..213edeb 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -341,6 +341,7 @@ public class ActiveRepairService
* @param parentRepairSession parent repair session ID
* @return Future result of all anti-compaction jobs.
*/
+ @SuppressWarnings("resource")
public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges)
{
assert parentRepairSession != null;
@@ -420,6 +421,7 @@ public class ActiveRepairService
this.sstableMap.put(cfId, existingSSTables);
}
+ @SuppressWarnings("resource")
public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
{
Set<SSTableReader> sstables = sstableMap.get(cfId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index 250e625..1e12310 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -143,6 +143,7 @@ public class FileCacheService
}
}
+ @SuppressWarnings("resource")
public void put(CacheKey cacheKey, RandomAccessReader instance)
{
int memoryUsed = memoryUsage.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 78376a8..7801c3e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1025,8 +1025,7 @@ public class StorageProxy implements StorageProxyMBean
InetAddress target = iter.next();
// Add the other destinations of the same message as a FORWARD_HEADER entry
- DataOutputBuffer out = new DataOutputBuffer();
- try
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
out.writeInt(targets.size() - 1);
while (iter.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index ff461ab..f168880 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -60,9 +60,8 @@ public class PagingState
public ByteBuffer serialize()
{
- try
+ try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize()))
{
- DataOutputBuffer out = new DataOutputBufferFixed(serializedSize());
ByteBufferUtil.writeWithShortLength(partitionKey, out);
ByteBufferUtil.writeWithShortLength(cellName, out);
out.writeInt(remaining);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 6a33b40..681f61e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -76,6 +76,7 @@ public class ConnectionHandler
*
* @throws IOException
*/
+ @SuppressWarnings("resource")
public void initiate() throws IOException
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
@@ -157,6 +158,7 @@ public class ConnectionHandler
protected abstract String name();
+ @SuppressWarnings("resource")
protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
@@ -175,6 +177,7 @@ public class ConnectionHandler
: in;
}
+ @SuppressWarnings("resource")
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
StreamInitMessage message = new StreamInitMessage(
@@ -246,6 +249,7 @@ public class ConnectionHandler
return "STREAM-IN";
}
+ @SuppressWarnings("resource")
public void run()
{
try
@@ -315,6 +319,7 @@ public class ConnectionHandler
messageQueue.put(message);
}
+ @SuppressWarnings("resource")
public void run()
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 0f3ebb3..1a3980d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -80,6 +80,7 @@ public class StreamReader
* @return SSTable transferred
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
+ @SuppressWarnings("resource")
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 392dccd..106677c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -70,18 +70,19 @@ public class StreamWriter
public void write(DataOutputStreamPlus output) throws IOException
{
long totalSize = totalSize();
- RandomAccessReader file = sstable.openDataReader();
- ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
- ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
- : null;
- transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
- // setting up data compression stream
- compressedOutput = new LZFOutputStream(output);
- long progress = 0L;
- try
+ try(RandomAccessReader file = sstable.openDataReader();
+ ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
+ ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
+ : null;)
{
+ transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+
+ // setting up data compression stream
+ compressedOutput = new LZFOutputStream(output);
+ long progress = 0L;
+
// stream each of the required sections of the file
for (Pair<Long, Long> section : sections)
{
@@ -109,12 +110,6 @@ public class StreamWriter
compressedOutput.flush();
}
}
- finally
- {
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- FileUtils.closeQuietly(validator);
- }
}
protected long totalSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 89773ea..1936a94 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -61,6 +61,7 @@ public class CompressedStreamReader extends StreamReader
* @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@Override
+ @SuppressWarnings("resource")
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 2fd7f63..144980c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -55,14 +55,11 @@ public class CompressedStreamWriter extends StreamWriter
public void write(DataOutputStreamPlus out) throws IOException
{
long totalSize = totalSize();
- RandomAccessReader file = sstable.openDataReader();
- final ChannelProxy fc = file.getChannel();
-
- long progress = 0L;
- // calculate chunks to transfer. we want to send continuous chunks altogether.
- List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
- try
+ try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
{
+ long progress = 0L;
+ // calculate chunks to transfer. we want to send continuous chunks altogether.
+ List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
// stream each of the required sections of the file
for (final Pair<Long, Long> section : sections)
{
@@ -75,7 +72,7 @@ public class CompressedStreamWriter extends StreamWriter
final long bytesTransferredFinal = bytesTransferred;
final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+ long lastWrite = out.applyToChannel(new Function<WritableByteChannel, Long>()
{
public Long apply(WritableByteChannel wbc)
{
@@ -88,11 +85,6 @@ public class CompressedStreamWriter extends StreamWriter
}
}
}
- finally
- {
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 1418651..fdfb32e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -36,6 +36,7 @@ public class IncomingFileMessage extends StreamMessage
{
public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
{
+ @SuppressWarnings("resource")
public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputStream input = new DataInputStream(Channels.newInputStream(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 4928039..e8b3f82 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -83,9 +83,11 @@ public class StreamInitMessage
try
{
int size = (int)StreamInitMessage.serializer.serializedSize(this, version);
- DataOutputBuffer buffer = new DataOutputBufferFixed(size);
- StreamInitMessage.serializer.serialize(this, buffer, version);
- bytes = buffer.getData();
+ try (DataOutputBuffer buffer = new DataOutputBufferFixed(size))
+ {
+ StreamInitMessage.serializer.serialize(this, buffer, version);
+ bytes = buffer.getData();
+ }
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 11fcc5e..de8df57 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -37,6 +37,7 @@ public class CustomTNonBlockingServer extends TNonblockingServer
}
@Override
+ @SuppressWarnings("resource")
protected boolean requestInvoke(FrameBuffer frameBuffer)
{
TNonblockingSocket socket = (TNonblockingSocket)((CustomFrameBuffer)frameBuffer).getTransport();
@@ -47,6 +48,7 @@ public class CustomTNonBlockingServer extends TNonblockingServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index e7584c9..a025004 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -84,6 +84,7 @@ public class CustomTThreadPoolServer extends TServer
this.args = args;
}
+ @SuppressWarnings("resource")
public void serve()
{
try
@@ -184,18 +185,16 @@ public class CustomTThreadPoolServer extends TServer
public void run()
{
TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
SocketAddress socket = null;
- try
+ try (TTransport inputTransport = inputTransportFactory_.getTransport(client_);
+ TTransport outputTransport = outputTransportFactory_.getTransport(client_))
{
socket = ((TCustomSocket) client_).getSocket().getRemoteSocketAddress();
ThriftSessionManager.instance.setCurrentSocket(socket);
processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
+
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped first to make sure we're not supposed to be shutting
@@ -227,10 +226,7 @@ public class CustomTThreadPoolServer extends TServer
{
if (socket != null)
ThriftSessionManager.instance.connectionComplete(socket);
- if (inputTransport != null)
- inputTransport.close();
- if (outputTransport != null)
- outputTransport.close();
+
activeClients.decrementAndGet();
}
}
@@ -238,6 +234,7 @@ public class CustomTThreadPoolServer extends TServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
final InetSocketAddress addr = args.addr;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java b/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
index d80d76e..ea74b94 100644
--- a/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
@@ -52,6 +52,7 @@ public class SSLTransportFactory implements ITransportFactory
private String[] cipherSuites;
@Override
+ @SuppressWarnings("resource")
public TTransport openTransport(String host, int port) throws Exception
{
TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(protocol, cipherSuites);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
index 63466b8..a430721 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
@@ -43,6 +43,7 @@ public class TCustomNonblockingServerSocket extends TNonblockingServerSocket
}
@Override
+ @SuppressWarnings("resource")
protected TNonblockingSocket acceptImpl() throws TTransportException
{
TNonblockingSocket tsocket = super.acceptImpl();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
index 477ef8c..8e27481 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
@@ -89,6 +89,7 @@ public class TCustomServerSocket extends TServerTransport
}
@Override
+ @SuppressWarnings("resource")
protected TCustomSocket acceptImpl() throws TTransportException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index a4c6bb7..7bf0b96 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -34,6 +34,7 @@ public class TFramedTransportFactory implements ITransportFactory
private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
private int thriftFramedTransportSizeMb = 15; // 15Mb is the default for C* & Hadoop ConfigHelper
+ @SuppressWarnings("resource")
public TTransport openTransport(String host, int port) throws TTransportException
{
TSocket socket = new TSocket(host, port);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
index dd501ec..37bc440 100644
--- a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
+++ b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
@@ -67,6 +67,7 @@ public class THsHaDisruptorServer extends TDisruptorServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 61edad2..bc460a1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -225,8 +225,7 @@ public class SSTableExport
public static void enumeratekeys(Descriptor desc, PrintStream outs, CFMetaData metadata)
throws IOException
{
- KeyIterator iter = new KeyIterator(desc);
- try
+ try (KeyIterator iter = new KeyIterator(desc))
{
DecoratedKey lastKey = null;
while (iter.hasNext())
@@ -242,10 +241,6 @@ public class SSTableExport
checkStream(outs); // flushes
}
}
- finally
- {
- iter.close();
- }
}
/**
@@ -261,8 +256,8 @@ public class SSTableExport
public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes, CFMetaData metadata) throws IOException
{
SSTableReader sstable = SSTableReader.open(desc);
- RandomAccessReader dfile = sstable.openDataReader();
- try
+
+ try (RandomAccessReader dfile = sstable.openDataReader())
{
IPartitioner partitioner = sstable.partitioner;
@@ -305,10 +300,6 @@ public class SSTableExport
outs.println("\n]");
outs.flush();
}
- finally
- {
- dfile.close();
- }
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 7b187ac..2fda6bd 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -299,54 +299,58 @@ public class SSTableImport
int importedKeys = 0;
long start = System.nanoTime();
- JsonParser parser = getParser(jsonFile);
-
- Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
+ Object[] data;
+ try (JsonParser parser = getParser(jsonFile))
+ {
+ data = parser.readValueAs(new TypeReference<Object[]>(){});
+ }
keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
-
- System.out.printf("Importing %s keys...%n", keyCountToImport);
-
- // sort by dk representation, but hold onto the hex version
- SortedMap<DecoratedKey,Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey,Map<?, ?>>();
- for (Object row : data)
+ try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0))
{
- Map<?,?> rowAsMap = (Map<?, ?>)row;
- decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap);
- }
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
- for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet())
- {
- if (row.getValue().containsKey("metadata"))
+ // sort by dk representation, but hold onto the hex version
+ SortedMap<DecoratedKey, Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey, Map<?, ?>>();
+
+ for (Object row : data)
{
- parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null);
+ Map<?, ?> rowAsMap = (Map<?, ?>) row;
+ decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap);
}
- Object columns = row.getValue().get("cells");
- addColumnsToCF((List<?>) columns, columnFamily);
+ for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet())
+ {
+ if (row.getValue().containsKey("metadata"))
+ {
+ parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null);
+ }
+ Object columns = row.getValue().get("cells");
+ addColumnsToCF((List<?>) columns, columnFamily);
- writer.append(row.getKey(), columnFamily);
- columnFamily.clear();
- importedKeys++;
+ writer.append(row.getKey(), columnFamily);
+ columnFamily.clear();
+
+ importedKeys++;
- long current = System.nanoTime();
+ long current = System.nanoTime();
- if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
- {
- System.out.printf("Currently imported %d keys.%n", importedKeys);
- start = current;
+ if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
+ {
+ System.out.printf("Currently imported %d keys.%n", importedKeys);
+ start = current;
+ }
+
+ if (keyCountToImport == importedKeys)
+ break;
}
- if (keyCountToImport == importedKeys)
- break;
+ writer.finish(true);
}
- writer.finish(true);
-
return importedKeys;
}
@@ -356,28 +360,29 @@ public class SSTableImport
int importedKeys = 0; // already imported keys count
long start = System.nanoTime();
- JsonParser parser = getParser(jsonFile);
-
- if (keyCountToImport == null)
+ try (JsonParser parser = getParser(jsonFile))
{
- keyCountToImport = 0;
- System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
- parser.nextToken(); // START_ARRAY
- while (parser.nextToken() != null)
+ if (keyCountToImport == null)
{
- parser.skipChildren();
- if (parser.getCurrentToken() == JsonToken.END_ARRAY)
- break;
+ keyCountToImport = 0;
+ System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
- keyCountToImport++;
+ parser.nextToken(); // START_ARRAY
+ while (parser.nextToken() != null)
+ {
+ parser.skipChildren();
+ if (parser.getCurrentToken() == JsonToken.END_ARRAY)
+ break;
+
+ keyCountToImport++;
+ }
}
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
}
- System.out.printf("Importing %s keys...%n", keyCountToImport);
-
- parser = getParser(jsonFile); // renewing parser
- try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
+ try (JsonParser parser = getParser(jsonFile); // renewing parser
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
{
int lineNumber = 1;
DecoratedKey prevStoredKey = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index d32ef88..dd513b8 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -119,8 +119,7 @@ public class StandaloneScrubber
{
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
{
- Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate);
- try
+ try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate))
{
scrubber.scrub();
}
@@ -132,10 +131,6 @@ public class StandaloneScrubber
throw t;
}
}
- finally
- {
- scrubber.close();
- }
// Remove the sstable (it's been copied by scrub and snapshotted)
sstable.markObsolete();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index a4f3e80..f71f58d 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -100,8 +100,8 @@ public class StandaloneVerifier
{
try
{
- Verifier verifier = new Verifier(cfs, sstable, handler, true);
- try
+
+ try (Verifier verifier = new Verifier(cfs, sstable, handler, true))
{
verifier.verify(extended);
}
@@ -110,10 +110,6 @@ public class StandaloneVerifier
System.err.println(String.format("Error verifying %s: %s", sstable, cs.getMessage()));
hasFailed = true;
}
- finally
- {
- verifier.close();
- }
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 5fad3ea..6f80ac0 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -40,6 +40,7 @@ class BloomFilterSerializer implements ISerializer<BloomFilter>
return deserialize(in, false);
}
+ @SuppressWarnings("resource")
public BloomFilter deserialize(DataInput in, boolean offheap) throws IOException
{
int hashes = in.readInt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index c20e33e..17edeb0 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -347,10 +347,8 @@ public class FBUtilities
public static String getReleaseVersionString()
{
- InputStream in = null;
- try
+ try (InputStream in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties"))
{
- in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties");
if (in == null)
{
return System.getProperty("cassandra.releaseVersion", "Unknown");
@@ -365,10 +363,6 @@ public class FBUtilities
logger.warn("Unable to load version.properties", e);
return "debug version";
}
- finally
- {
- FileUtils.closeQuietly(in);
- }
}
public static long timestampMicros()
@@ -718,10 +712,10 @@ public class FBUtilities
public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version)
{
- try
+ int size = (int) serializer.serializedSize(object, version);
+
+ try (DataOutputBuffer buffer = new DataOutputBufferFixed(size))
{
- int size = (int) serializer.serializedSize(object, version);
- DataOutputBuffer buffer = new DataOutputBufferFixed(size);
serializer.serialize(object, buffer, version);
assert buffer.getLength() == size && buffer.getData().length == size
: String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 7cfc332..d77500c 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -78,6 +78,7 @@ public class FilterFactory
return createFilter(spec.K, numElements, spec.bucketsPerElement, offheap);
}
+ @SuppressWarnings("resource")
private static IFilter createFilter(int hash, long numElements, int bucketsPer, boolean offheap)
{
long numBits = (numElements * bucketsPer) + BITSET_EXCESS;