You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/28 00:00:53 UTC
[1/4] cassandra git commit: Add Static Analysis to warn on unsafe use
of Autocloseable instances
Repository: cassandra
Updated Branches:
refs/heads/trunk 5746f602f -> d91eb0116
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 5b46700..5448390 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -61,9 +61,9 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
if (username != null && password != null)
builder = builder.withCredentials(username, password);
- try (Cluster cluster = builder.build())
+ try (Cluster cluster = builder.build(); Session session = cluster.connect())
{
- Session session = cluster.connect();
+
Metadata metadata = cluster.getMetadata();
setPartitioner(metadata.getPartitioner());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index ebabd79..3df3ea4 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -49,7 +49,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
* Once the Ref.GlobalState has been completely released, the Tidy method is called and it removes the global reference
* to itself so it may also be collected.
*/
-public final class Ref<T> implements RefCounted<T>, AutoCloseable
+public final class Ref<T> implements RefCounted<T>
{
static final Logger logger = LoggerFactory.getLogger(Ref.class);
public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index dd65971..bbc672d 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -89,6 +89,12 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
return ref != null;
}
+ public void relaseAllExcept(Collection<T> keep)
+ {
+ Collection<T> release = new ArrayList<>(references.keySet());
+ release.retainAll(keep);
+ release(release);
+ }
/**
* Release a retained Ref to all of the provided objects; if any is not held, an exception will be thrown
* @param release
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 02efa65..46c1bd0 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -125,6 +125,7 @@ public class OffHeapBitSet implements IBitSet
return type.sizeof((int) bytes.size()) + bytes.size();
}
+ @SuppressWarnings("resource")
public static OffHeapBitSet deserialize(DataInput in) throws IOException
{
long byteCount = in.readInt() * 8L;
[3/4] cassandra git commit: Add Static Analysis to warn on unsafe use
of Autocloseable instances
Posted by ja...@apache.org.
Add Static Analysis to warn on unsafe use of Autocloseable instances
Patch by tjake and carlyeks, reviewed by benedict for CASSANDRA-9431
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aafe053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aafe053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aafe053
Branch: refs/heads/trunk
Commit: 7aafe053e7ffffc3b2e4ac1b2a444749df3dbbaa
Parents: 6fe6c99
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed May 20 10:23:18 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed May 27 17:53:26 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 44 +++-
eclipse_compiler.properties | 88 ++++++++
.../apache/cassandra/cache/AutoSavingCache.java | 1 +
.../cassandra/cache/SerializingCache.java | 5 +
.../apache/cassandra/db/BatchlogManager.java | 7 +-
.../org/apache/cassandra/db/ColumnFamily.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 31 +--
src/java/org/apache/cassandra/db/Memtable.java | 30 +--
.../cassandra/db/MutationVerbHandler.java | 24 +-
.../apache/cassandra/db/RangeSliceReply.java | 5 +-
.../cassandra/db/SizeEstimatesRecorder.java | 17 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 8 +-
.../cassandra/db/commitlog/CommitLog.java | 8 +-
.../db/commitlog/CommitLogArchiver.java | 10 +-
.../db/commitlog/CommitLogReplayer.java | 16 +-
.../db/commitlog/CommitLogSegment.java | 1 +
.../compaction/AbstractCompactionStrategy.java | 1 +
.../db/compaction/CompactionManager.java | 222 +++++++++++--------
.../cassandra/db/compaction/CompactionTask.java | 71 +++---
.../DateTieredCompactionStrategy.java | 3 +
.../db/compaction/LazilyCompactedRow.java | 15 +-
.../compaction/LeveledCompactionStrategy.java | 2 +
.../cassandra/db/compaction/Scrubber.java | 3 +
.../SizeTieredCompactionStrategy.java | 3 +
.../cassandra/db/compaction/Upgrader.java | 12 +-
.../compaction/WrappingCompactionStrategy.java | 1 +
.../writers/DefaultCompactionWriter.java | 2 +
.../writers/MajorLeveledCompactionWriter.java | 3 +
.../writers/MaxSSTableSizeWriter.java | 3 +
.../SplittingSizeTieredCompactionWriter.java | 3 +
.../cassandra/db/marshal/CompositeType.java | 3 +-
.../apache/cassandra/gms/FailureDetector.java | 8 +-
.../hadoop/AbstractColumnFamilyInputFormat.java | 15 +-
.../hadoop/ColumnFamilyInputFormat.java | 1 +
.../hadoop/ColumnFamilyOutputFormat.java | 1 +
.../hadoop/ColumnFamilyRecordReader.java | 1 +
.../hadoop/ColumnFamilyRecordWriter.java | 1 +
.../apache/cassandra/hadoop/ConfigHelper.java | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 48 ++--
.../cassandra/hadoop/pig/CqlNativeStorage.java | 21 +-
.../io/compress/CompressionMetadata.java | 65 +++---
.../cassandra/io/sstable/CQLSSTableWriter.java | 1 +
.../cassandra/io/sstable/IndexSummary.java | 14 +-
.../io/sstable/IndexSummaryBuilder.java | 1 +
.../io/sstable/IndexSummaryManager.java | 1 +
.../apache/cassandra/io/sstable/SSTable.java | 8 +-
.../cassandra/io/sstable/SSTableLoader.java | 5 +-
.../io/sstable/format/SSTableReader.java | 62 +++---
.../io/sstable/format/big/BigTableReader.java | 22 +-
.../io/sstable/format/big/BigTableWriter.java | 8 +-
.../format/big/SSTableNamesIterator.java | 2 +
.../io/util/DataIntegrityMetadata.java | 8 +-
.../cassandra/io/util/DataOutputBuffer.java | 2 +-
.../cassandra/io/util/PoolingSegmentedFile.java | 1 +
.../cassandra/io/util/RandomAccessReader.java | 4 +
.../cassandra/io/util/SafeMemoryWriter.java | 2 +-
.../cassandra/locator/CloudstackSnitch.java | 13 +-
.../cassandra/locator/PropertyFileSnitch.java | 8 +-
.../apache/cassandra/net/MessagingService.java | 5 +
.../cassandra/net/OutboundTcpConnection.java | 1 +
.../apache/cassandra/security/SSLFactory.java | 1 +
.../cassandra/service/ActiveRepairService.java | 2 +
.../cassandra/service/FileCacheService.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 3 +-
.../cassandra/service/pager/PagingState.java | 3 +-
.../cassandra/streaming/ConnectionHandler.java | 5 +
.../cassandra/streaming/StreamReader.java | 1 +
.../cassandra/streaming/StreamWriter.java | 25 +--
.../compress/CompressedStreamReader.java | 1 +
.../compress/CompressedStreamWriter.java | 18 +-
.../streaming/messages/IncomingFileMessage.java | 1 +
.../streaming/messages/StreamInitMessage.java | 8 +-
.../thrift/CustomTNonBlockingServer.java | 2 +
.../thrift/CustomTThreadPoolServer.java | 15 +-
.../cassandra/thrift/SSLTransportFactory.java | 1 +
.../thrift/TCustomNonblockingServerSocket.java | 1 +
.../cassandra/thrift/TCustomServerSocket.java | 1 +
.../thrift/TFramedTransportFactory.java | 1 +
.../cassandra/thrift/THsHaDisruptorServer.java | 1 +
.../apache/cassandra/tools/SSTableExport.java | 15 +-
.../apache/cassandra/tools/SSTableImport.java | 97 ++++----
.../cassandra/tools/StandaloneScrubber.java | 7 +-
.../cassandra/tools/StandaloneVerifier.java | 8 +-
.../cassandra/utils/BloomFilterSerializer.java | 1 +
.../org/apache/cassandra/utils/FBUtilities.java | 14 +-
.../apache/cassandra/utils/FilterFactory.java | 1 +
.../utils/NativeSSTableLoaderClient.java | 4 +-
.../apache/cassandra/utils/concurrent/Ref.java | 2 +-
.../apache/cassandra/utils/concurrent/Refs.java | 6 +
.../cassandra/utils/obs/OffHeapBitSet.java | 1 +
92 files changed, 706 insertions(+), 533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5b3220..ad2845f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2
+ * Static Analysis to warn on unsafe use of Autocloseable instances (CASSANDRA-9431)
* Update commitlog archiving examples now that commitlog segments are
not recycled (CASSANDRA-9350)
* Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index cbedf20..bb401ee 100644
--- a/build.xml
+++ b/build.xml
@@ -68,8 +68,8 @@
<property name="dist.dir" value="${build.dir}/dist"/>
<property name="tmp.dir" value="${java.io.tmpdir}"/>
- <property name="source.version" value="1.7"/>
- <property name="target.version" value="1.7"/>
+ <property name="source.version" value="1.7"/>
+ <property name="target.version" value="1.7"/>
<condition property="version" value="${base.version}">
<isset property="release"/>
@@ -114,6 +114,8 @@
<property name="jacoco.execfile" value="${jacoco.export.dir}/jacoco.exec" />
<property name="jacoco.version" value="0.7.1.201405082137"/>
+ <property name="ecj.version" value="4.4.2"/>
+
<condition property="maven-ant-tasks.jar.exists">
<available file="${build.dir}/maven-ant-tasks-${maven-ant-tasks.version}.jar" />
</condition>
@@ -1047,7 +1049,7 @@
</tar>
</target>
- <target name="release" depends="artifacts,rat-init"
+ <target name="release" depends="eclipse-warnings,artifacts,rat-init"
description="Create and QC release artifacts">
<checksum forceOverwrite="yes" todir="${build.dir}" fileext=".md5"
algorithm="MD5">
@@ -1406,7 +1408,7 @@
</target>
<target name="test-all"
- depends="test,long-test,test-compression,pig-test,test-clientutil-jar"
+ depends="eclipse-warnings,test,long-test,test-compression,pig-test,test-clientutil-jar"
description="Run all tests except for those under test-burn" />
<!-- Use JaCoCo ant extension without needing externally saved lib -->
@@ -1894,6 +1896,40 @@
<delete dir="build/eclipse-classes" />
</target>
+
+ <target name="eclipse-warnings" depends="build" description="Run eclipse compiler code analysis">
+ <property name="ecj.log.dir" value="${build.dir}/ecj" />
+ <property name="ecj.warnings.file" value="${ecj.log.dir}/eclipse_compiler_checks.txt"/>
+ <delete dir="${ecj.log.dir}" />
+ <mkdir dir="${ecj.log.dir}" />
+
+ <property name="ecj.properties" value="${basedir}/eclipse_compiler.properties" />
+
+ <echo message="Running Eclipse Code Analysis. Output logged to ${ecj.warnings.file}" />
+
+ <java
+ jar="${build.dir.lib}/jars/ecj-${ecj.version}.jar"
+ fork="true"
+ failonerror="true"
+ maxmemory="512m">
+ <arg value="-source"/>
+ <arg value="${source.version}" />
+ <arg value="-target"/>
+ <arg value="${target.version}" />
+ <arg value="-d" />
+ <arg value="none" />
+ <arg value="-proc:none" />
+ <arg value="-log" />
+ <arg value="${ecj.warnings.file}" />
+ <arg value="-properties" />
+ <arg value="${ecj.properties}" />
+ <arg value="-cp" />
+ <arg value="${toString:cassandra.classpath}" />
+ <arg value="${build.src.java}" />
+ </java>
+ </target>
+
+
<!-- Publish artifacts to Maven repositories -->
<target name="mvn-install"
depends="maven-declare-dependencies,artifacts,jar,sources-jar,javadoc-jar"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/eclipse_compiler.properties
----------------------------------------------------------------------
diff --git a/eclipse_compiler.properties b/eclipse_compiler.properties
new file mode 100644
index 0000000..e1f2802
--- /dev/null
+++ b/eclipse_compiler.properties
@@ -0,0 +1,88 @@
+# These options come from
+# http://grepcode.com/file/repo1.maven.org/maven2/org.eclipse.jdt.core.compiler/ecj/4.2.1/org/eclipse/jdt/internal/compiler/impl/CompilerOptions.java#CompilerOptions
+
+#Look for important errors
+#
+# Autoclosables not in try-with-references
+org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=error
+org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=error
+org.eclipse.jdt.core.compiler.problem.unclosedCloseable=ignore
+#Ignore and disable all other checks too keep the logs clean
+
+
+org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=ignore
+org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
+org.eclipse.jdt.core.compiler.problem.comparingIdentical=ignore
+org.eclipse.jdt.core.compiler.problem.deadCode=ignore
+org.eclipse.jdt.core.compiler.problem.deprecation=ignore
+org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
+org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled
+org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore
+org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore
+org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
+org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
+org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore
+org.eclipse.jdt.core.compiler.problem.finalParameterBound=ignore
+org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=ignore
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore
+org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=ignore
+org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
+org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=ignore
+org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=ignore
+org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore
+org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore
+org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=ignore
+org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
+org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore
+org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
+org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore
+org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore
+org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=disabled
+org.eclipse.jdt.core.compiler.problem.missingSerialVersion=ignore
+org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
+org.eclipse.jdt.core.compiler.problem.noEffectAssignment=ignore
+org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=ignore
+org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore
+org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
+org.eclipse.jdt.core.compiler.problem.nullReference=ignore
+org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
+org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=ignore
+org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=ignore
+org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
+org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore
+org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
+org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore
+org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=ignore
+org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
+org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore
+org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
+org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
+org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
+org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
+org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=ignore
+org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=enabled
+org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
+org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore
+org.eclipse.jdt.core.compiler.problem.typeParameterHiding=ignore
+org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=disabled
+org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore
+org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore
+org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=ignore
+org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore
+org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore
+org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
+org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore
+org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=disabled
+org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=disabled
+org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled
+org.eclipse.jdt.core.compiler.problem.unusedImport=ignore
+org.eclipse.jdt.core.compiler.problem.unusedLabel=ignore
+org.eclipse.jdt.core.compiler.problem.unusedLocal=ignore
+org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
+org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
+org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=disabled
+org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled
+org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled
+org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=ignore
+org.eclipse.jdt.core.compiler.problem.unusedWarningToken=ignore
+org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=ignore
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index b381224..a204a18 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -227,6 +227,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
}
+ @SuppressWarnings("resource")
public void saveCache()
{
logger.debug("Deleting old {} files.", cacheType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java
index 911b500..0e38922 100644
--- a/src/java/org/apache/cassandra/cache/SerializingCache.java
+++ b/src/java/org/apache/cassandra/cache/SerializingCache.java
@@ -155,6 +155,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
map.clear();
}
+ @SuppressWarnings("resource")
public V get(K key)
{
RefCountedMemory mem = map.get(key);
@@ -172,6 +173,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
}
}
+ @SuppressWarnings("resource")
public void put(K key, V value)
{
RefCountedMemory mem = serialize(value);
@@ -193,6 +195,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
old.unreference();
}
+ @SuppressWarnings("resource")
public boolean putIfAbsent(K key, V value)
{
RefCountedMemory mem = serialize(value);
@@ -216,6 +219,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
return old == null;
}
+ @SuppressWarnings("resource")
public boolean replace(K key, V oldToReplace, V value)
{
// if there is no old value in our map, we fail
@@ -259,6 +263,7 @@ public class SerializingCache<K, V> implements ICache<K, V>
public void remove(K key)
{
+ @SuppressWarnings("resource")
RefCountedMemory mem = map.remove(key);
if (mem != null)
mem.unreference();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index f5137fd..dd84ac8 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -148,20 +148,17 @@ public class BatchlogManager implements BatchlogManagerMBean
private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
{
- DataOutputBuffer buf = new DataOutputBuffer();
-
- try
+ try (DataOutputBuffer buf = new DataOutputBuffer())
{
buf.writeInt(mutations.size());
for (Mutation mutation : mutations)
Mutation.serializer.serialize(mutation, buf, version);
+ return buf.buffer();
}
catch (IOException e)
{
throw new AssertionError(); // cannot happen.
}
-
- return buf.buffer();
}
private void replayAllFailedBatches() throws ExecutionException, InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 006ced7..a7243a2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -521,9 +521,11 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
public ByteBuffer toBytes()
{
- DataOutputBuffer out = new DataOutputBuffer();
- serializer.serialize(this, out, MessagingService.current_version);
- return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ serializer.serialize(this, out, MessagingService.current_version);
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b41ac75..63ffb16 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1795,6 +1795,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return repairedSSTables;
}
+ @SuppressWarnings("resource")
public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter)
{
while (true)
@@ -1966,6 +1967,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*
* @param range The range of keys and columns within those keys to fetch
*/
+ @SuppressWarnings("resource")
private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
{
assert !(range.keyRange() instanceof Range) || !((Range<?>)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
@@ -1980,24 +1982,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
protected Row computeNext()
{
- // pull a row out of the iterator
- if (!iterator.hasNext())
- return endOfData();
+ while (true)
+ {
+ // pull a row out of the iterator
+ if (!iterator.hasNext())
+ return endOfData();
- Row current = iterator.next();
- DecoratedKey key = current.key;
+ Row current = iterator.next();
+ DecoratedKey key = current.key;
- if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
- return endOfData();
+ if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
+ return endOfData();
- // skipping outside of assigned range
- if (!range.contains(key))
- return computeNext();
+ // skipping outside of assigned range
+ if (!range.contains(key))
+ continue;
- if (logger.isTraceEnabled())
- logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
+ if (logger.isTraceEnabled())
+ logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
- return current;
+ return current;
+ }
}
public void close() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 55b0bfe..ccf92be 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -423,19 +423,21 @@ public class Memtable implements Comparable<Memtable>
private static int estimateRowOverhead(final int count)
{
// calculate row overhead
- final OpOrder.Group group = new OpOrder().start();
- int rowOverhead;
- MemtableAllocator allocator = MEMORY_POOL.newAllocator();
- ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
- final Object val = new Object();
- for (int i = 0 ; i < count ; i++)
- rows.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
- double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
- rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
- rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
- rowOverhead += AtomicBTreeColumns.EMPTY_SIZE;
- allocator.setDiscarding();
- allocator.setDiscarded();
- return rowOverhead;
+ try (final OpOrder.Group group = new OpOrder().start())
+ {
+ int rowOverhead;
+ MemtableAllocator allocator = MEMORY_POOL.newAllocator();
+ ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
+ final Object val = new Object();
+ for (int i = 0; i < count; i++)
+ rows.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
+ double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
+ rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
+ rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
+ rowOverhead += AtomicBTreeColumns.EMPTY_SIZE;
+ allocator.setDiscarding();
+ allocator.setDiscarded();
+ return rowOverhead;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 92bfdb5..3baa93e 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -59,18 +59,20 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
*/
private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
{
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
- int size = in.readInt();
-
- // tell the recipients who to send their ack to
- MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
- // Send a message to each of the addresses on our Forward List
- for (int i = 0; i < size; i++)
+ try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
{
- InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
- int id = in.readInt();
- Tracing.trace("Enqueuing forwarded write to {}", address);
- MessagingService.instance().sendOneWay(message, id, address);
+ int size = in.readInt();
+
+ // tell the recipients who to send their ack to
+ MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
+ // Send a message to each of the addresses on our Forward List
+ for (int i = 0; i < size; i++)
+ {
+ InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
+ int id = in.readInt();
+ Tracing.trace("Enqueuing forwarded write to {}", address);
+ MessagingService.instance().sendOneWay(message, id, address);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
index 5964ea8..ed1f523 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java
@@ -57,7 +57,10 @@ public class RangeSliceReply
public static RangeSliceReply read(byte[] body, int version) throws IOException
{
- return serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(body)), version);
+ try (DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(body)))
+ {
+ return serializer.deserialize(dis, version);
+ }
}
private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index b0c114a..c68109c 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -82,6 +82,7 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
}
}
+ @SuppressWarnings("resource")
private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
{
// for each local primary range, estimate (crudely) mean partition size and partitions count.
@@ -90,22 +91,24 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
{
// filter sstables that have partitions in this range.
Refs<SSTableReader> refs = null;
- while (refs == null)
- {
- ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(Range.makeRowRange(range)));
- refs = Refs.tryRef(view.sstables);
- }
-
long partitionsCount, meanPartitionSize;
+
try
{
+ while (refs == null)
+ {
+ ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(Range.makeRowRange(range)));
+ refs = Refs.tryRef(view.sstables);
+ }
+
// calculate the estimates.
partitionsCount = estimatePartitionsCount(refs, range);
meanPartitionSize = estimateMeanPartitionSize(refs);
}
finally
{
- refs.release();
+ if (refs != null)
+ refs.release();
}
estimates.put(range, Pair.create(partitionsCount, meanPartitionSize));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 67a3162..9956728 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -437,17 +437,16 @@ public final class SystemKeyspace
private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
- DataOutputBuffer out = new DataOutputBuffer();
- try
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
ReplayPosition.serializer.serialize(position, out);
out.writeLong(truncatedAt);
+ return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
}
public static ReplayPosition getTruncatedPosition(UUID cfId)
@@ -1116,9 +1115,8 @@ public final class SystemKeyspace
private static ByteBuffer rangeToBytes(Range<Token> range)
{
- try
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
- DataOutputBuffer out = new DataOutputBuffer();
Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_22);
return out.buffer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a8dda28..a81145d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -257,12 +257,10 @@ public class CommitLog implements CommitLogMBean
}
Allocation alloc = allocator.allocate(mutation, (int) totalSize);
- try
+ ICRC32 checksum = CRC32Factory.instance.create();
+ final ByteBuffer buffer = alloc.getBuffer();
+ try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
{
- ICRC32 checksum = CRC32Factory.instance.create();
- final ByteBuffer buffer = alloc.getBuffer();
- BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer);
-
// checksummed length
dos.writeInt((int) size);
checksum.update(buffer, buffer.position() - 4, 4);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 27abae3..02072de 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -63,11 +63,8 @@ public class CommitLogArchiver
public CommitLogArchiver()
{
Properties commitlog_commands = new Properties();
- InputStream stream = null;
- try
+ try (InputStream stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties"))
{
- stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties");
-
if (stream == null)
{
logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled");
@@ -113,10 +110,7 @@ public class CommitLogArchiver
{
throw new RuntimeException("Unable to load commitlog_archiving.properties", e);
}
- finally
- {
- FileUtils.closeQuietly(stream);
- }
+
}
public void maybeArchive(final CommitLogSegment segment)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 6f9039d..a59e70e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -268,6 +268,7 @@ public class CommitLogReplayer
}
}
+ @SuppressWarnings("resource")
public void recover(File file) throws IOException
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
@@ -340,6 +341,7 @@ public class CommitLogReplayer
FileDataInput sectionReader = reader;
if (compressor != null)
+ {
try
{
int start = (int) reader.getFilePointer();
@@ -363,6 +365,7 @@ public class CommitLogReplayer
logger.error("Unexpected exception decompressing section {}", e);
continue;
}
+ }
if (!replaySyncSection(sectionReader, replayEnd, desc))
break;
@@ -469,9 +472,9 @@ public class CommitLogReplayer
void replayMutation(byte[] inputBuffer, int size,
final long entryLocation, final CommitLogDescriptor desc) throws IOException
{
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+
final Mutation mutation;
- try
+ try (FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size))
{
mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
desc.getMessagingVersion(),
@@ -499,15 +502,12 @@ public class CommitLogReplayer
{
JVMStabilityInspector.inspectThrowable(t);
File f = File.createTempFile("mutation", "dat");
- DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
- try
+
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
{
out.write(inputBuffer, 0, size);
}
- finally
- {
- out.close();
- }
+
String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
f.getAbsolutePath());
logger.error(st, t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index d04690d..ee160c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -160,6 +160,7 @@ public abstract class CommitLogSegment
* Allocate space in this buffer for the provided mutation, and return the allocated Allocation object.
* Returns null if there is not enough space in this segment, and a new segment is needed.
*/
+ @SuppressWarnings("resource") //we pass the op order around
Allocation allocate(Mutation mutation, int size)
{
final OpOrder.Group opGroup = appendOrder.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 38107c0..97e7041 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -268,6 +268,7 @@ public abstract class AbstractCompactionStrategy
* allow for a more memory efficient solution if we know the sstable don't overlap (see
* LeveledCompactionStrategy for instance).
*/
+ @SuppressWarnings("resource")
public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 2079325..ffed554 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -42,6 +42,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
@@ -70,9 +71,11 @@ import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -243,6 +246,7 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ @SuppressWarnings("resource")
private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException
{
try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);)
@@ -259,7 +263,8 @@ public class CompactionManager implements CompactionManagerMBean
}
Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals());
- List<Future<Object>> futures = new ArrayList<>();
+ List<Pair<LifecycleTransaction,Future<Object>>> futures = new ArrayList<>();
+
for (final SSTableReader sstable : sstables)
{
@@ -270,7 +275,7 @@ public class CompactionManager implements CompactionManagerMBean
}
final LifecycleTransaction txn = compacting.split(singleton(sstable));
- futures.add(executor.submit(new Callable<Object>()
+ futures.add(Pair.create(txn,executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
@@ -278,13 +283,37 @@ public class CompactionManager implements CompactionManagerMBean
operation.execute(txn);
return this;
}
- }));
+ })));
}
assert compacting.originals().isEmpty();
- for (Future<Object> f : futures)
- f.get();
+
+ //Collect all exceptions
+ Exception exception = null;
+
+ for (Pair<LifecycleTransaction, Future<Object>> f : futures)
+ {
+ try
+ {
+ f.right.get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ if (exception == null)
+ exception = new Exception();
+
+ exception.addSuppressed(e);
+ }
+ finally
+ {
+ f.left.close();
+ }
+ }
+
+ if (exception != null)
+ Throwables.propagate(exception);
+
return AllSSTableOpStatus.SUCCESSFUL;
}
}
@@ -407,6 +436,7 @@ public class CompactionManager implements CompactionManagerMBean
{
Runnable runnable = new WrappedRunnable() {
@Override
+ @SuppressWarnings("resource")
public void runMayThrow() throws Exception
{
LifecycleTransaction modifier = null;
@@ -427,6 +457,7 @@ public class CompactionManager implements CompactionManagerMBean
if (executor.isShutdown())
{
logger.info("Compaction executor has shut down, not submitting anticompaction");
+ sstables.release();
return Futures.immediateCancelledFuture();
}
@@ -659,35 +690,35 @@ public class CompactionManager implements CompactionManagerMBean
private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData);
+ CompactionInfo.Holder scrubInfo = null;
- CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
- metrics.beginCompaction(scrubInfo);
- try
+ try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData))
{
+ scrubInfo = scrubber.getScrubInfo();
+ metrics.beginCompaction(scrubInfo);
scrubber.scrub();
}
finally
{
- scrubber.close();
- metrics.finishCompaction(scrubInfo);
+ if (scrubInfo != null)
+ metrics.finishCompaction(scrubInfo);
}
}
private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean extendedVerify) throws IOException
{
- Verifier verifier = new Verifier(cfs, sstable, false);
+ CompactionInfo.Holder verifyInfo = null;
- CompactionInfo.Holder verifyInfo = verifier.getVerifyInfo();
- metrics.beginCompaction(verifyInfo);
- try
+ try (Verifier verifier = new Verifier(cfs, sstable, false))
{
+ verifyInfo = verifier.getVerifyInfo();
+ metrics.beginCompaction(verifyInfo);
verifier.verify(extendedVerify);
}
finally
{
- verifier.close();
- metrics.finishCompaction(verifyInfo);
+ if (verifyInfo != null)
+ metrics.finishCompaction(verifyInfo);
}
}
@@ -798,10 +829,11 @@ public class CompactionManager implements CompactionManagerMBean
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- row = cleanupStrategy.cleanup(row);
+ @SuppressWarnings("resource")
+ SSTableIdentityIterator row = cleanupStrategy.cleanup((SSTableIdentityIterator) scanner.next());
if (row == null)
continue;
+ @SuppressWarnings("resource")
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
if (writer.append(compactedRow) != null)
totalkeysWritten++;
@@ -991,35 +1023,40 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- Refs<SSTableReader> sstables = null;
- try
- {
+ String snapshotName = validator.desc.sessionId.toString();
+ boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
- String snapshotName = validator.desc.sessionId.toString();
- int gcBefore;
- boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
- if (isSnapshotValidation)
- {
- // If there is a snapshot created for the session then read from there.
- // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
- // are supposed to validate.
- sstables = cfs.getSnapshotSSTableReader(snapshotName);
+ int gcBefore;
+ if (isSnapshotValidation)
+ {
+ // If there is a snapshot created for the session then read from there.
+ // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
+ // are supposed to validate.
+ try (Refs<SSTableReader> sstables = cfs.getSnapshotSSTableReader(snapshotName))
+ {
// Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
// this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
// time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
// 'as good as in the non-snapshot' case)
gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+
+ buildMerkleTree(cfs, sstables, validator, gcBefore);
+
+ // review comment: should this be in a try/finally? it was previously
+ cfs.clearSnapshot(snapshotName);
}
- else
+ }
+ else
+ {
+ // flush first so everyone is validating data that is as similar as possible
+ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+ try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
{
- // flush first so everyone is validating data that is as similar as possible
- StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
- ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
+ Refs<SSTableReader> refs = sstableCandidates.refs;
Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
for (SSTableReader sstable : sstableCandidates.sstables)
{
if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
@@ -1036,83 +1073,75 @@ public class CompactionManager implements CompactionManagerMBean
throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
}
- sstables = Refs.tryRef(sstablesToValidate);
- if (sstables == null)
- {
- logger.error("Could not reference sstables");
- throw new RuntimeException("Could not reference sstables");
- }
- sstableCandidates.release();
+ refs.relaseAllExcept(sstablesToValidate);
prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
gcBefore = getDefaultGcBefore(cfs);
- }
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
+
+ buildMerkleTree(cfs, refs, validator, gcBefore);
}
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ }
+ }
- long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ private void buildMerkleTree(ColumnFamilyStore cfs, Refs<SSTableReader> sstables, Validator validator, int gcBefore)
+ {
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
+ }
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+ long start = System.nanoTime();
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ {
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+ metrics.beginCompaction(ci);
+ try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator();)
{
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
- {
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
- }
- validator.complete();
- }
- finally
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (iter.hasNext())
{
- if (isSnapshotValidation)
- {
- cfs.clearSnapshot(snapshotName);
- }
-
- metrics.finishCompaction(ci);
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ @SuppressWarnings("resource")
+ AbstractCompactedRow row = iter.next();
+ validator.add(row);
}
+ validator.complete();
}
-
- if (logger.isDebugEnabled())
+ catch (Exception e)
+ {
+ Throwables.propagate(e);
+ }
+ finally
{
- // MT serialize may take time
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
- duration,
- depth,
- numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
- validator.desc);
+ metrics.finishCompaction(ci);
}
}
- finally
+
+ if (logger.isDebugEnabled())
{
- if (sstables != null)
- sstables.release();
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
}
}
-
-
/**
* Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
@@ -1187,12 +1216,14 @@ public class CompactionManager implements CompactionManagerMBean
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
- Iterator<AbstractCompactedRow> iter = ci.iterator();
metrics.beginCompaction(ci);
try
{
+ @SuppressWarnings("resource")
+ CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
while (iter.hasNext())
{
+ @SuppressWarnings("resource")
AbstractCompactedRow row = iter.next();
// if current range from sstable is repaired, save it into the new repaired sstable
if (Range.isInRanges(row.key.getToken(), ranges))
@@ -1315,6 +1346,7 @@ public class CompactionManager implements CompactionManagerMBean
private static class ValidationCompactionIterable extends CompactionIterable
{
+ @SuppressWarnings("resource")
public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
{
super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index e593ec0..7089016 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,9 +17,9 @@
*/
package org.apache.cassandra.db.compaction;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.UUIDGen;
public class CompactionTask extends AbstractCompactionTask
@@ -159,45 +160,49 @@ public class CompactionTask extends AbstractCompactionTask
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- if (collector != null)
- collector.beginCompaction(ci);
- long lastCheckObsoletion = start;
-
- if (!controller.cfs.getCompactionStrategy().isActive)
- throw new CompactionInterruptedException(ci.getCompactionInfo());
-
- try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+ try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
{
- estimatedKeys = writer.estimatedKeys();
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ if (collector != null)
+ collector.beginCompaction(ci);
+ long lastCheckObsoletion = start;
- AbstractCompactedRow row = iter.next();
- if (writer.append(row))
- totalKeysWritten++;
+ if (!controller.cfs.getCompactionStrategy().isActive)
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
- if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
+ try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+ {
+ estimatedKeys = writer.estimatedKeys();
+ while (iter.hasNext())
{
- controller.maybeRefreshOverlaps();
- lastCheckObsoletion = System.nanoTime();
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ try (AbstractCompactedRow row = iter.next())
+ {
+ if (writer.append(row))
+ totalKeysWritten++;
+
+ if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
+ {
+ controller.maybeRefreshOverlaps();
+ lastCheckObsoletion = System.nanoTime();
+ }
+ }
}
- }
- // don't replace old sstables yet, as we need to mark the compaction finished in the system table
- newSStables = writer.finish();
- }
- finally
- {
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ newSStables = writer.finish();
+ }
+ finally
+ {
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- if (collector != null)
- collector.finishCompaction(ci);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 18d5f7b..43f998a 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -56,6 +56,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
if (!isEnabled())
@@ -366,6 +367,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ @SuppressWarnings("resource")
public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
{
LifecycleTransaction modifier = cfs.markAllCompacting(OperationType.COMPACTION);
@@ -376,6 +378,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index c3d764e..9eb624e 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.Throwables;
/**
* LazilyCompactedRow only computes the row bloom filter and column index in memory
@@ -157,13 +158,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow
// no special-case for rows.size == 1, we're actually skipping some bytes here so just
// blindly updating everything wouldn't be correct
- DataOutputBuffer out = new DataOutputBuffer();
-
- // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
- indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out);
-
- try
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
+ // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
+ indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out);
+
DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), out);
// do not update digest in case of missing or purged row level tombstones, see CASSANDRA-8979
@@ -192,6 +191,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
public void close()
{
+ Throwable accumulate = null;
for (OnDiskAtomIterator row : rows)
{
try
@@ -200,10 +200,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ accumulate = Throwables.merge(accumulate, e);
}
}
closed = true;
+ Throwables.maybeFail(accumulate);
}
protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index c434d31..9eb58ff 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -88,6 +88,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
* the only difference between background and maximal in LCS is that maximal is still allowed
* (by explicit user request) even when compaction is disabled.
*/
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
if (!isEnabled())
@@ -126,6 +127,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
}
}
+ @SuppressWarnings("resource")
public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> sstables = manifest.getAllSSTables();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 93b76bd..10952e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -86,6 +86,7 @@ public class Scrubber implements Closeable
this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
}
+ @SuppressWarnings("resource")
public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
{
this.cfs = cfs;
@@ -204,6 +205,7 @@ public class Scrubber implements Closeable
continue;
}
+ @SuppressWarnings("resource")
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.tryAppend(compactedRow) == null)
emptyRows++;
@@ -234,6 +236,7 @@ public class Scrubber implements Closeable
continue;
}
+ @SuppressWarnings("resource")
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
if (writer.tryAppend(compactedRow) == null)
emptyRows++;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 94c3daf..74a9757 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -175,6 +175,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
}
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
if (!isEnabled())
@@ -193,6 +194,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
}
}
+ @SuppressWarnings("resource")
public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
@@ -206,6 +208,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false));
}
+ @SuppressWarnings("resource")
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 6556a71..ca975b8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.util.*;
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -28,6 +30,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.UUIDGen;
@@ -83,12 +86,13 @@ public class Upgrader
outputHandler.output("Upgrading " + sstable);
try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
- AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals()))
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals());
+ CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator())
{
- Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
+ @SuppressWarnings("resource")
AbstractCompactedRow row = iter.next();
writer.append(row);
}
@@ -96,6 +100,10 @@ public class Upgrader
writer.finish();
outputHandler.output("Upgrade of " + sstable + " complete.");
}
+ catch (Exception e)
+ {
+ Throwables.propagate(e);
+ }
finally
{
controller.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index c511bcd..adda0c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -344,6 +344,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ @SuppressWarnings("resource")
public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 0b31061..7d88458 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -41,12 +41,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+ @SuppressWarnings("resource")
public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
{
super(cfs, txn, nonExpiredSSTables, offline);
logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
estimatedTotalKeys,
minRepairedAt,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 014b4af..95d7a0c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -47,6 +47,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
private int sstablesWritten = 0;
private final boolean skipAncestors;
+ @SuppressWarnings("resource")
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
{
super(cfs, txn, nonExpiredSSTables, offline);
@@ -61,6 +62,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
if (skipAncestors)
logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory");
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
keysPerSSTable,
minRepairedAt,
@@ -71,6 +73,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
}
@Override
+ @SuppressWarnings("resource")
public boolean append(AbstractCompactedRow row)
{
long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 8903ff7..d30a612 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -39,6 +39,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
private final long estimatedSSTables;
private final Set<SSTableReader> allSSTables;
+ @SuppressWarnings("resource")
public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
{
super(cfs, txn, nonExpiredSSTables, offline);
@@ -50,6 +51,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
@@ -66,6 +68,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
{
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 81ea6b1..9ff1325 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -56,6 +56,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
}
+ @SuppressWarnings("resource")
public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
{
super(cfs, txn, nonExpiredSSTables, false);
@@ -83,6 +84,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
currentPartitionsToWrite,
minRepairedAt,
@@ -104,6 +106,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
+ @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
currentPartitionsToWrite,
minRepairedAt,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 1bc772d..f3c041e 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -402,9 +402,8 @@ public class CompositeType extends AbstractCompositeType
public ByteBuffer build()
{
- try
+ try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize))
{
- DataOutputBuffer out = new DataOutputBufferFixed(serializedSize);
if (isStatic)
out.writeShort(STATIC_MARKER);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index b8c20d7..45867a3 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -177,20 +177,14 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
{
File file = FileUtils.createTempFile("failuredetector-", ".dat");
- OutputStream os = null;
- try
+ try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file, true)))
{
- os = new BufferedOutputStream(new FileOutputStream(file, true));
os.write(toString().getBytes());
}
catch (IOException e)
{
throw new FSWriteError(e, file);
}
- finally
- {
- FileUtils.closeQuietly(os);
- }
}
public void setPhiConvictThreshold(double phi)
[4/4] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Conflicts:
build.xml
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d91eb011
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d91eb011
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d91eb011
Branch: refs/heads/trunk
Commit: d91eb0116950fa5fc7a1a4cf93e090d476536ea3
Parents: 5746f60 7aafe05
Author: T Jake Luciani <ja...@apache.org>
Authored: Wed May 27 17:56:48 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed May 27 17:56:48 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 40 +++-
eclipse_compiler.properties | 88 ++++++++
.../apache/cassandra/cache/AutoSavingCache.java | 1 +
.../cassandra/cache/SerializingCache.java | 5 +
.../apache/cassandra/db/BatchlogManager.java | 7 +-
.../org/apache/cassandra/db/ColumnFamily.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 31 +--
src/java/org/apache/cassandra/db/Memtable.java | 30 +--
.../cassandra/db/MutationVerbHandler.java | 24 +-
.../apache/cassandra/db/RangeSliceReply.java | 5 +-
.../cassandra/db/SizeEstimatesRecorder.java | 17 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 8 +-
.../cassandra/db/commitlog/CommitLog.java | 8 +-
.../db/commitlog/CommitLogArchiver.java | 10 +-
.../db/commitlog/CommitLogReplayer.java | 16 +-
.../db/commitlog/CommitLogSegment.java | 1 +
.../compaction/AbstractCompactionStrategy.java | 1 +
.../db/compaction/CompactionManager.java | 222 +++++++++++--------
.../cassandra/db/compaction/CompactionTask.java | 71 +++---
.../DateTieredCompactionStrategy.java | 3 +
.../db/compaction/LazilyCompactedRow.java | 15 +-
.../compaction/LeveledCompactionStrategy.java | 2 +
.../cassandra/db/compaction/Scrubber.java | 3 +
.../SizeTieredCompactionStrategy.java | 3 +
.../cassandra/db/compaction/Upgrader.java | 12 +-
.../compaction/WrappingCompactionStrategy.java | 1 +
.../writers/DefaultCompactionWriter.java | 2 +
.../writers/MajorLeveledCompactionWriter.java | 3 +
.../writers/MaxSSTableSizeWriter.java | 3 +
.../SplittingSizeTieredCompactionWriter.java | 3 +
.../cassandra/db/marshal/CompositeType.java | 3 +-
.../apache/cassandra/gms/FailureDetector.java | 8 +-
.../hadoop/AbstractColumnFamilyInputFormat.java | 15 +-
.../hadoop/ColumnFamilyInputFormat.java | 1 +
.../hadoop/ColumnFamilyOutputFormat.java | 1 +
.../hadoop/ColumnFamilyRecordReader.java | 1 +
.../hadoop/ColumnFamilyRecordWriter.java | 1 +
.../apache/cassandra/hadoop/ConfigHelper.java | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 48 ++--
.../cassandra/hadoop/pig/CqlNativeStorage.java | 21 +-
.../io/compress/CompressionMetadata.java | 65 +++---
.../cassandra/io/sstable/CQLSSTableWriter.java | 1 +
.../cassandra/io/sstable/IndexSummary.java | 14 +-
.../io/sstable/IndexSummaryBuilder.java | 1 +
.../io/sstable/IndexSummaryManager.java | 1 +
.../apache/cassandra/io/sstable/SSTable.java | 8 +-
.../cassandra/io/sstable/SSTableLoader.java | 5 +-
.../io/sstable/format/SSTableReader.java | 62 +++---
.../io/sstable/format/big/BigTableReader.java | 22 +-
.../io/sstable/format/big/BigTableWriter.java | 8 +-
.../format/big/SSTableNamesIterator.java | 2 +
.../io/util/DataIntegrityMetadata.java | 8 +-
.../cassandra/io/util/DataOutputBuffer.java | 2 +-
.../cassandra/io/util/PoolingSegmentedFile.java | 1 +
.../cassandra/io/util/RandomAccessReader.java | 4 +
.../cassandra/io/util/SafeMemoryWriter.java | 2 +-
.../cassandra/locator/CloudstackSnitch.java | 13 +-
.../cassandra/locator/PropertyFileSnitch.java | 8 +-
.../apache/cassandra/net/MessagingService.java | 5 +
.../cassandra/net/OutboundTcpConnection.java | 1 +
.../apache/cassandra/security/SSLFactory.java | 1 +
.../cassandra/service/ActiveRepairService.java | 2 +
.../cassandra/service/FileCacheService.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 3 +-
.../cassandra/service/pager/PagingState.java | 3 +-
.../cassandra/streaming/ConnectionHandler.java | 5 +
.../cassandra/streaming/StreamReader.java | 1 +
.../cassandra/streaming/StreamWriter.java | 25 +--
.../compress/CompressedStreamReader.java | 1 +
.../compress/CompressedStreamWriter.java | 18 +-
.../streaming/messages/IncomingFileMessage.java | 1 +
.../streaming/messages/StreamInitMessage.java | 8 +-
.../thrift/CustomTNonBlockingServer.java | 2 +
.../thrift/CustomTThreadPoolServer.java | 15 +-
.../cassandra/thrift/SSLTransportFactory.java | 1 +
.../thrift/TCustomNonblockingServerSocket.java | 1 +
.../cassandra/thrift/TCustomServerSocket.java | 1 +
.../thrift/TFramedTransportFactory.java | 1 +
.../cassandra/thrift/THsHaDisruptorServer.java | 1 +
.../apache/cassandra/tools/SSTableExport.java | 15 +-
.../apache/cassandra/tools/SSTableImport.java | 97 ++++----
.../cassandra/tools/StandaloneScrubber.java | 7 +-
.../cassandra/tools/StandaloneVerifier.java | 8 +-
.../cassandra/utils/BloomFilterSerializer.java | 1 +
.../org/apache/cassandra/utils/FBUtilities.java | 14 +-
.../apache/cassandra/utils/FilterFactory.java | 1 +
.../utils/NativeSSTableLoaderClient.java | 4 +-
.../apache/cassandra/utils/concurrent/Ref.java | 2 +-
.../apache/cassandra/utils/concurrent/Refs.java | 6 +
.../cassandra/utils/obs/OffHeapBitSet.java | 1 +
92 files changed, 704 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d91eb011/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4d0725c,ad2845f..ebaf169
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
+3.0:
+ * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
+ * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
+
+
2.2
+ * Static Analysis to warn on unsafe use of Autocloseable instances (CASSANDRA-9431)
* Update commitlog archiving examples now that commitlog segments are
not recycled (CASSANDRA-9350)
* Extend Transactional API to sstable lifecycle management (CASSANDRA-8568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d91eb011/build.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d91eb011/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
[2/4] cassandra git commit: Add Static Analysis to warn on unsafe use
of Autocloseable instances
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 2c04475..4dd53ff 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -241,13 +241,14 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
-
- Map<TokenRange, Set<Host>> map = new HashMap<>();
- Metadata metadata = session.getCluster().getMetadata();
- for (TokenRange tokenRange : metadata.getTokenRanges())
- map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
- return map;
+ try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+ {
+ Map<TokenRange, Set<Host>> map = new HashMap<>();
+ Metadata metadata = session.getCluster().getMetadata();
+ for (TokenRange tokenRange : metadata.getTokenRanges())
+ map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
+ return map;
+ }
}
private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 88dd2e2..f89825f 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -61,6 +61,7 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+ @SuppressWarnings("resource")
public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF input format");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 94ced69..92e3829 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -114,6 +114,7 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
* @throws Exception set of thrown exceptions may be implementation defined,
* depending on the used transport factory
*/
+ @SuppressWarnings("resource")
public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF output format");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index d205f13..c103d75 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -82,6 +82,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
this.keyBufferSize = keyBufferSize;
}
+ @SuppressWarnings("resource")
public void close()
{
if (client != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 31c7047..f06f03d 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -249,6 +249,7 @@ final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutat
throw lastException;
}
+ @SuppressWarnings("resource")
protected void closeInternal()
{
if (client != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index b956e23..e81860d 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -543,6 +543,7 @@ public class ConfigHelper
return client;
}
+ @SuppressWarnings("resource")
public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 3033fa6..9462724 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -602,13 +602,9 @@ public class CqlConfigHelper
private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword)
throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException
{
- FileInputStream tsf = null;
- FileInputStream ksf = null;
SSLContext ctx = null;
- try
+ try (FileInputStream tsf = new FileInputStream(truststorePath); FileInputStream ksf = new FileInputStream(keystorePath))
{
- tsf = new FileInputStream(truststorePath);
- ksf = new FileInputStream(keystorePath);
ctx = SSLContext.getInstance("SSL");
KeyStore ts = KeyStore.getInstance("JKS");
@@ -623,11 +619,6 @@ public class CqlConfigHelper
ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
}
- finally
- {
- FileUtils.closeQuietly(tsf);
- FileUtils.closeQuietly(ksf);
- }
return ctx;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 91753a2..78b0494 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -116,24 +116,24 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
try
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
- ringCache = new NativeRingCache(conf);
- if (client != null)
- {
- TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
- clusterColumns = tableMetadata.getClusteringColumns();
- partitionKeyColumns = tableMetadata.getPartitionKey();
-
- String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
- if (cqlQuery.toLowerCase().startsWith("insert"))
- throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
- cql = appendKeyWhereClauses(cqlQuery);
-
- client.close();
- }
- else
+ try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
{
- throw new IllegalArgumentException("Invalid configuration specified " + conf);
+ ringCache = new NativeRingCache(conf);
+ if (client != null)
+ {
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid configuration specified " + conf);
+ }
}
}
catch (Exception e)
@@ -489,13 +489,15 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf
private void refreshEndpointMap()
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
- rangeMap = new HashMap<>();
- metadata = session.getCluster().getMetadata();
- Set<TokenRange> ranges = metadata.getTokenRanges();
- for (TokenRange range : ranges)
+ try (Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
{
- rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ rangeMap = new HashMap<>();
+ metadata = session.getCluster().getMetadata();
+ Set<TokenRange> ranges = metadata.getTokenRanges();
+ for (TokenRange range : ranges)
+ {
+ rangeMap.put(range, metadata.getReplicas(keyspace, range));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 0b833b7..63baf9c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -699,9 +699,8 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
// Only get the schema if we haven't already gotten it
if (!properties.containsKey(signature))
{
- try
+ try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect())
{
- Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect();
client.execute("USE " + keyspace);
// compose the CfDef for the columfamily
@@ -729,9 +728,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
{
TableInfo tableInfo = new TableInfo(cfDef);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream( baos );
- oos.writeObject( tableInfo );
- oos.close();
+ try (ObjectOutputStream oos = new ObjectOutputStream( baos ))
+ {
+ oos.writeObject(tableInfo);
+ }
+
return new String( Base64Coder.encode(baos.toByteArray()) );
}
@@ -739,11 +740,11 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo
protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
{
byte [] data = Base64Coder.decode( st );
- ObjectInputStream ois = new ObjectInputStream(
- new ByteArrayInputStream( data ) );
- Object o = ois.readObject();
- ois.close();
- return (TableInfo)o;
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)))
+ {
+ Object o = ois.readObject();
+ return (TableInfo)o;
+ }
}
/** decompose the query to store the parameters in a map */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c994a3d..23a9f3e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -91,17 +91,7 @@ public class CompressionMetadata
{
this.indexFilePath = indexFilePath;
- DataInputStream stream;
- try
- {
- stream = new DataInputStream(new FileInputStream(indexFilePath));
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
-
- try
+ try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
{
String compressorName = stream.readUTF();
int optionCount = stream.readInt();
@@ -126,13 +116,13 @@ public class CompressionMetadata
compressedFileLength = compressedLength;
chunkOffsets = readChunkOffsets(stream);
}
- catch (IOException e)
+ catch (FileNotFoundException e)
{
- throw new CorruptSSTableException(e, indexFilePath);
+ throw new RuntimeException(e);
}
- finally
+ catch (IOException e)
{
- FileUtils.closeQuietly(stream);
+ throw new CorruptSSTableException(e, indexFilePath);
}
this.chunkOffsetsSize = chunkOffsets.size();
@@ -176,32 +166,42 @@ public class CompressionMetadata
*/
private Memory readChunkOffsets(DataInput input)
{
+ final int chunkCount;
try
{
- int chunkCount = input.readInt();
+ chunkCount = input.readInt();
if (chunkCount <= 0)
throw new IOException("Compressed file with 0 chunks encountered: " + input);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, indexFilePath);
+ }
- Memory offsets = Memory.allocate(chunkCount * 8L);
+ @SuppressWarnings("resource")
+ Memory offsets = Memory.allocate(chunkCount * 8L);
+ int i = 0;
+ try
+ {
- for (int i = 0; i < chunkCount; i++)
+ for (i = 0; i < chunkCount; i++)
{
- try
- {
- offsets.setLong(i * 8L, input.readLong());
- }
- catch (EOFException e)
- {
- String msg = String.format("Corrupted Index File %s: read %d but expected %d chunks.",
- indexFilePath, i, chunkCount);
- throw new CorruptSSTableException(new IOException(msg, e), indexFilePath);
- }
+ offsets.setLong(i * 8L, input.readLong());
}
return offsets;
}
catch (IOException e)
{
+ if (offsets != null)
+ offsets.close();
+
+ if (e instanceof EOFException)
+ {
+ String msg = String.format("Corrupted Index File %s: read %d but expected %d chunks.",
+ indexFilePath, i, chunkCount);
+ throw new CorruptSSTableException(new IOException(msg, e), indexFilePath);
+ }
throw new FSReadError(e, indexFilePath);
}
}
@@ -345,10 +345,8 @@ public class CompressionMetadata
}
// flush the data to disk
- DataOutputStream out = null;
- try
+ try (DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath))))
{
- out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
writeHeader(out, dataLength, count);
for (int i = 0 ; i < count ; i++)
out.writeLong(offsets.getLong(i * 8L));
@@ -357,12 +355,9 @@ public class CompressionMetadata
{
throw Throwables.propagate(e);
}
- finally
- {
- FileUtils.closeQuietly(out);
- }
}
+ @SuppressWarnings("resource")
public CompressionMetadata open(long dataLength, long compressedLength)
{
SafeMemory offsets = this.offsets.sharedCopy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 1389ad2..4181ed0 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -509,6 +509,7 @@ public class CQLSSTableWriter implements Closeable
}
}
+ @SuppressWarnings("resource")
public CQLSSTableWriter build()
{
if (directory == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 59c5eef..7df7349 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -289,6 +289,7 @@ public class IndexSummary extends WrappedSharedCloseable
out.write(t.entries, 0, t.entriesLength);
}
+ @SuppressWarnings("resource")
public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
{
int minIndexInterval = in.readInt();
@@ -321,8 +322,17 @@ public class IndexSummary extends WrappedSharedCloseable
Memory offsets = Memory.allocate(offsetCount * 4);
Memory entries = Memory.allocate(offheapSize - offsets.size());
- FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
- FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+ try
+ {
+ FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+ FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+ }
+ catch (IOException ioe)
+ {
+ offsets.free();
+ entries.free();
+ throw ioe;
+ }
// our on-disk representation treats the offsets and the summary data as one contiguous structure,
// in which the offsets are based from the start of the structure. i.e., if the offsets occupy
// X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 12e41c8..6110afe 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -281,6 +281,7 @@ public class IndexSummaryBuilder implements AutoCloseable
* @param partitioner the partitioner used for the index summary
* @return a new IndexSummary
*/
+ @SuppressWarnings("resource")
public static IndexSummary downsample(IndexSummary existing, int newSamplingLevel, int minIndexInterval, IPartitioner partitioner)
{
// To downsample the old index summary, we'll go through (potentially) several rounds of downsampling.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 9bfbc99..6f66fd3 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -202,6 +202,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
* Returns a Pair of all compacting and non-compacting sstables. Non-compacting sstables will be marked as
* compacting.
*/
+ @SuppressWarnings("resource")
private Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> getCompactingAndNonCompactingSSTables()
{
List<SSTableReader> allCompacting = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index bc3486a..2077152 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -270,10 +270,8 @@ public abstract class SSTable
protected static void appendTOC(Descriptor descriptor, Collection<Component> components)
{
File tocFile = new File(descriptor.filenameFor(Component.TOC));
- PrintWriter w = null;
- try
+ try (PrintWriter w = new PrintWriter(new FileWriter(tocFile, true)))
{
- w = new PrintWriter(new FileWriter(tocFile, true));
for (Component component : components)
w.println(component.name);
}
@@ -281,10 +279,6 @@ public abstract class SSTable
{
throw new FSWriteError(e, tocFile);
}
- finally
- {
- FileUtils.closeQuietly(w);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 15008d2..b99003b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -71,6 +71,7 @@ public class SSTableLoader implements StreamEventHandler
this.connectionsPerHost = connectionsPerHost;
}
+ @SuppressWarnings("resource")
protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
{
outputHandler.output("Opening sstables and calculating sections to stream");
@@ -126,9 +127,7 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
- Ref ref = sstable.tryRef();
- if (ref == null)
- throw new IllegalStateException("Could not acquire ref for "+sstable);
+ Ref<SSTableReader> ref = sstable.ref();
StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 8e701b3..54dff4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -653,16 +653,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
private void loadBloomFilter() throws IOException
{
- DataInputStream stream = null;
- try
+ try (DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))))
{
- stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
bf = FilterFactory.deserialize(stream, true);
}
- finally
- {
- FileUtils.closeQuietly(stream);
- }
}
/**
@@ -725,9 +719,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
{
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
-
- try
+ try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
{
long indexSize = primaryIndex.length();
long histogramCount = sstableMetadata.estimatedRowSize.count();
@@ -768,10 +760,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
indexSummary = summaryBuilder.build(partitioner);
}
}
- finally
- {
- FileUtils.closeQuietly(primaryIndex);
- }
first = getMinimalKey(first);
last = getMinimalKey(last);
@@ -787,6 +775,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param dbuilder
* @return true if index summary is loaded successfully from Summary.db file.
*/
+ @SuppressWarnings("resource")
public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
@@ -841,9 +830,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
int expectedIndexInterval = getMinIndexInterval();
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next())
{
+ path = in.getPath();
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
@@ -864,11 +854,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
@@ -901,10 +887,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
- DataOutputStreamPlus oStream = null;
- try
+ try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
{
- oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));
IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
@@ -919,10 +903,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
}
- finally
- {
- FileUtils.closeQuietly(oStream);
- }
}
public void setReplaced()
@@ -1000,6 +980,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @return a new SSTableReader
* @throws IOException
*/
+ @SuppressWarnings("resource")
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
assert descriptor.version.hasSamplingLevel();
@@ -1479,9 +1460,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next();)
{
+ path = in.getPath();
while (!in.isEOF())
{
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
@@ -1495,11 +1477,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
@@ -2027,6 +2005,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
// get a new reference to the shared DescriptorTypeTidy for this sstable
+ @SuppressWarnings("resource")
public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
{
Descriptor desc = sstable.descriptor;
@@ -2038,7 +2017,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
refc = new Ref<>(tidy, tidy);
Ref<?> ex = lookup.putIfAbsent(desc, refc);
- assert ex == null;
+ if (ex != null)
+ {
+ refc.close();
+ throw new AssertionError();
+ }
return refc;
}
}
@@ -2119,6 +2102,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
// get a new reference to the shared GlobalTidy for this sstable
+ @SuppressWarnings("resource")
public static Ref<GlobalTidy> get(SSTableReader sstable)
{
Descriptor descriptor = sstable.descriptor;
@@ -2128,7 +2112,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
final GlobalTidy tidy = new GlobalTidy(sstable);
refc = new Ref<>(tidy, tidy);
Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
- assert ex == null;
+ if (ex != null)
+ {
+ refc.close();
+ throw new AssertionError();
+ }
return refc;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index baf6d51..3f375e7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -177,9 +177,10 @@ public class BigTableReader extends SSTableReader
Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext())
{
- FileDataInput in = segments.next();
- try
+ String path = null;
+ try (FileDataInput in = segments.next())
{
+ path = in.getPath();
while (!in.isEOF())
{
i++;
@@ -220,11 +221,12 @@ public class BigTableReader extends SSTableReader
if (logger.isTraceEnabled())
{
// expensive sanity check! see CASSANDRA-4687
- FileDataInput fdi = dfile.getSegment(indexEntry.position);
- DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
- if (!keyInDisk.equals(key))
- throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
- fdi.close();
+ try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
+ {
+ DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ if (!keyInDisk.equals(key))
+ throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
+ }
}
// store exact match for the key
@@ -242,11 +244,7 @@ public class BigTableReader extends SSTableReader
catch (IOException e)
{
markSuspect();
- throw new CorruptSSTableException(e, in.getPath());
- }
- finally
- {
- FileUtils.closeQuietly(in);
+ throw new CorruptSSTableException(e, path);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 0f8f0d3..30b55a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -285,6 +285,7 @@ public class BigTableWriter extends SSTableWriter
return link;
}
+ @SuppressWarnings("resource")
public SSTableReader openEarly()
{
// find the max (exclusive) readable key
@@ -318,6 +319,7 @@ public class BigTableWriter extends SSTableWriter
return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
}
+ @SuppressWarnings("resource")
private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
{
if (maxDataAge < 0)
@@ -507,15 +509,13 @@ public class BigTableWriter extends SSTableWriter
if (components.contains(Component.FILTER))
{
String path = descriptor.filenameFor(Component.FILTER);
- try
+ try (FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos))
{
// bloom filter
- FileOutputStream fos = new FileOutputStream(path);
- DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos);
FilterFactory.serialize(bf, stream);
stream.flush();
SyncUtil.sync(fos);
- stream.close();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index c51e595..7c9a344 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@ -95,6 +95,7 @@ class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDis
return fileToClose;
}
+ @SuppressWarnings("resource")
private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
throws IOException
{
@@ -170,6 +171,7 @@ class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDis
}
}
+ @SuppressWarnings("resource")
private void readIndexedColumns(CFMetaData metadata,
FileDataInput file,
SortedSet<CellName> columnNames,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index a3c4135..4362cee 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -200,20 +200,14 @@ public class DataIntegrityMetadata
public void writeFullChecksum(Descriptor descriptor)
{
File outFile = new File(descriptor.filenameFor(Component.DIGEST));
- BufferedWriter out = null;
- try
+ try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
{
- out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8);
out.write(String.valueOf(fullChecksum.getValue()));
}
catch (IOException e)
{
throw new FSWriteError(e, outFile);
}
- finally
- {
- FileUtils.closeQuietly(out);
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index b556587..6ffc895 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -93,7 +93,7 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
}
@Override
- public void close() throws IOException
+ public void close()
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index db9391c..a5fa20b 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -54,6 +54,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
}
}
+ @SuppressWarnings("resource")
public FileDataInput getSegment(long position)
{
RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 302f054..278f55c 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -229,6 +229,10 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
public void deallocate()
{
+ //make idempotent
+ if (buffer == null)
+ return;
+
bufferOffset += buffer.position();
FileUtils.clean(buffer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 1096b5f..aad3266 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.io.util;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -26,6 +25,7 @@ public class SafeMemoryWriter extends DataOutputBuffer
{
private SafeMemory memory;
+ @SuppressWarnings("resource")
public SafeMemoryWriter(long initialCapacity)
{
this(new SafeMemory(initialCapacity));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
index afcd083..88c62e9 100644
--- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
+++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
@@ -174,15 +174,12 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
String csEndpointFromLease(File lease) throws ConfigurationException
{
- BufferedReader reader = null;
-
- String line = null;
+ String line;
String endpoint = null;
Pattern identifierPattern = Pattern.compile("^[ \t]*option dhcp-server-identifier (.*);$");
- try
+ try (BufferedReader reader = new BufferedReader(new FileReader(lease)))
{
- reader = new BufferedReader(new FileReader(lease));
while ((line = reader.readLine()) != null)
{
@@ -194,14 +191,10 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
break;
}
}
- }
+ }
catch (Exception e)
{
throw new ConfigurationException("CloudstackSnitch cannot access lease file.");
- }
- finally
- {
- FileUtils.closeQuietly(reader);
}
if (endpoint == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 247eb00..8665816 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -136,20 +136,14 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
HashMap<InetAddress, String[]> reloadedMap = new HashMap<InetAddress, String[]>();
Properties properties = new Properties();
- InputStream stream = null;
- try
+ try (InputStream stream = getClass().getClassLoader().getResourceAsStream(SNITCH_PROPERTIES_FILENAME))
{
- stream = getClass().getClassLoader().getResourceAsStream(SNITCH_PROPERTIES_FILENAME);
properties.load(stream);
}
catch (Exception e)
{
throw new ConfigurationException("Unable to read " + SNITCH_PROPERTIES_FILENAME, e);
}
- finally
- {
- FileUtils.closeQuietly(stream);
- }
for (Map.Entry<Object, Object> entry : properties.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c54d5ee..293a27c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -436,6 +436,7 @@ public final class MessagingService implements MessagingServiceMBean
listenGate.signalAll();
}
+ @SuppressWarnings("resource")
private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
{
final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
@@ -471,6 +472,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (SocketException e)
{
+ FileUtils.closeQuietly(socket);
throw new ConfigurationException("Insufficient permissions to setReuseAddress", e);
}
InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
@@ -480,6 +482,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (BindException e)
{
+ FileUtils.closeQuietly(socket);
if (e.getMessage().contains("in use"))
throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
else if (e.getMessage().contains("Cannot assign requested address"))
@@ -490,6 +493,7 @@ public final class MessagingService implements MessagingServiceMBean
}
catch (IOException e)
{
+ FileUtils.closeQuietly(socket);
throw new RuntimeException(e);
}
logger.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort());
@@ -874,6 +878,7 @@ public final class MessagingService implements MessagingServiceMBean
this.server = server;
}
+ @SuppressWarnings("resource")
public void run()
{
while (!server.isClosed())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 78ef615..0eb8e02 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -366,6 +366,7 @@ public class OutboundTcpConnection extends Thread
}
}
+ @SuppressWarnings("resource")
private boolean connect()
{
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java
index 956ba67..e9aa07d 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -99,6 +99,7 @@ public final class SSLFactory
return socket;
}
+ @SuppressWarnings("resource")
public static SSLContext createSSLContext(EncryptionOptions options, boolean buildTruststore) throws IOException
{
FileInputStream tsf = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index d350f4e..213edeb 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -341,6 +341,7 @@ public class ActiveRepairService
* @param parentRepairSession parent repair session ID
* @return Future result of all anti-compaction jobs.
*/
+ @SuppressWarnings("resource")
public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges)
{
assert parentRepairSession != null;
@@ -420,6 +421,7 @@ public class ActiveRepairService
this.sstableMap.put(cfId, existingSSTables);
}
+ @SuppressWarnings("resource")
public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
{
Set<SSTableReader> sstables = sstableMap.get(cfId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index 250e625..1e12310 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -143,6 +143,7 @@ public class FileCacheService
}
}
+ @SuppressWarnings("resource")
public void put(CacheKey cacheKey, RandomAccessReader instance)
{
int memoryUsed = memoryUsage.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 78376a8..7801c3e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1025,8 +1025,7 @@ public class StorageProxy implements StorageProxyMBean
InetAddress target = iter.next();
// Add the other destinations of the same message as a FORWARD_HEADER entry
- DataOutputBuffer out = new DataOutputBuffer();
- try
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
out.writeInt(targets.size() - 1);
while (iter.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index ff461ab..f168880 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -60,9 +60,8 @@ public class PagingState
public ByteBuffer serialize()
{
- try
+ try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize()))
{
- DataOutputBuffer out = new DataOutputBufferFixed(serializedSize());
ByteBufferUtil.writeWithShortLength(partitionKey, out);
ByteBufferUtil.writeWithShortLength(cellName, out);
out.writeInt(remaining);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 6a33b40..681f61e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -76,6 +76,7 @@ public class ConnectionHandler
*
* @throws IOException
*/
+ @SuppressWarnings("resource")
public void initiate() throws IOException
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
@@ -157,6 +158,7 @@ public class ConnectionHandler
protected abstract String name();
+ @SuppressWarnings("resource")
protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
@@ -175,6 +177,7 @@ public class ConnectionHandler
: in;
}
+ @SuppressWarnings("resource")
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
StreamInitMessage message = new StreamInitMessage(
@@ -246,6 +249,7 @@ public class ConnectionHandler
return "STREAM-IN";
}
+ @SuppressWarnings("resource")
public void run()
{
try
@@ -315,6 +319,7 @@ public class ConnectionHandler
messageQueue.put(message);
}
+ @SuppressWarnings("resource")
public void run()
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 0f3ebb3..1a3980d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -80,6 +80,7 @@ public class StreamReader
* @return SSTable transferred
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
+ @SuppressWarnings("resource")
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 392dccd..106677c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -70,18 +70,19 @@ public class StreamWriter
public void write(DataOutputStreamPlus output) throws IOException
{
long totalSize = totalSize();
- RandomAccessReader file = sstable.openDataReader();
- ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
- ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
- : null;
- transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
- // setting up data compression stream
- compressedOutput = new LZFOutputStream(output);
- long progress = 0L;
- try
+ try(RandomAccessReader file = sstable.openDataReader();
+ ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
+ ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
+ : null;)
{
+ transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+
+ // setting up data compression stream
+ compressedOutput = new LZFOutputStream(output);
+ long progress = 0L;
+
// stream each of the required sections of the file
for (Pair<Long, Long> section : sections)
{
@@ -109,12 +110,6 @@ public class StreamWriter
compressedOutput.flush();
}
}
- finally
- {
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- FileUtils.closeQuietly(validator);
- }
}
protected long totalSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 89773ea..1936a94 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -61,6 +61,7 @@ public class CompressedStreamReader extends StreamReader
* @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@Override
+ @SuppressWarnings("resource")
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 2fd7f63..144980c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -55,14 +55,11 @@ public class CompressedStreamWriter extends StreamWriter
public void write(DataOutputStreamPlus out) throws IOException
{
long totalSize = totalSize();
- RandomAccessReader file = sstable.openDataReader();
- final ChannelProxy fc = file.getChannel();
-
- long progress = 0L;
- // calculate chunks to transfer. we want to send continuous chunks altogether.
- List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
- try
+ try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
{
+ long progress = 0L;
+ // calculate chunks to transfer. we want to send continuous chunks altogether.
+ List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
// stream each of the required sections of the file
for (final Pair<Long, Long> section : sections)
{
@@ -75,7 +72,7 @@ public class CompressedStreamWriter extends StreamWriter
final long bytesTransferredFinal = bytesTransferred;
final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+ long lastWrite = out.applyToChannel(new Function<WritableByteChannel, Long>()
{
public Long apply(WritableByteChannel wbc)
{
@@ -88,11 +85,6 @@ public class CompressedStreamWriter extends StreamWriter
}
}
}
- finally
- {
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 1418651..fdfb32e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -36,6 +36,7 @@ public class IncomingFileMessage extends StreamMessage
{
public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
{
+ @SuppressWarnings("resource")
public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputStream input = new DataInputStream(Channels.newInputStream(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 4928039..e8b3f82 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -83,9 +83,11 @@ public class StreamInitMessage
try
{
int size = (int)StreamInitMessage.serializer.serializedSize(this, version);
- DataOutputBuffer buffer = new DataOutputBufferFixed(size);
- StreamInitMessage.serializer.serialize(this, buffer, version);
- bytes = buffer.getData();
+ try (DataOutputBuffer buffer = new DataOutputBufferFixed(size))
+ {
+ StreamInitMessage.serializer.serialize(this, buffer, version);
+ bytes = buffer.getData();
+ }
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 11fcc5e..de8df57 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -37,6 +37,7 @@ public class CustomTNonBlockingServer extends TNonblockingServer
}
@Override
+ @SuppressWarnings("resource")
protected boolean requestInvoke(FrameBuffer frameBuffer)
{
TNonblockingSocket socket = (TNonblockingSocket)((CustomFrameBuffer)frameBuffer).getTransport();
@@ -47,6 +48,7 @@ public class CustomTNonBlockingServer extends TNonblockingServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index e7584c9..a025004 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -84,6 +84,7 @@ public class CustomTThreadPoolServer extends TServer
this.args = args;
}
+ @SuppressWarnings("resource")
public void serve()
{
try
@@ -184,18 +185,16 @@ public class CustomTThreadPoolServer extends TServer
public void run()
{
TProcessor processor = null;
- TTransport inputTransport = null;
- TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
SocketAddress socket = null;
- try
+ try (TTransport inputTransport = inputTransportFactory_.getTransport(client_);
+ TTransport outputTransport = outputTransportFactory_.getTransport(client_))
{
socket = ((TCustomSocket) client_).getSocket().getRemoteSocketAddress();
ThriftSessionManager.instance.setCurrentSocket(socket);
processor = processorFactory_.getProcessor(client_);
- inputTransport = inputTransportFactory_.getTransport(client_);
- outputTransport = outputTransportFactory_.getTransport(client_);
+
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped first to make sure we're not supposed to be shutting
@@ -227,10 +226,7 @@ public class CustomTThreadPoolServer extends TServer
{
if (socket != null)
ThriftSessionManager.instance.connectionComplete(socket);
- if (inputTransport != null)
- inputTransport.close();
- if (outputTransport != null)
- outputTransport.close();
+
activeClients.decrementAndGet();
}
}
@@ -238,6 +234,7 @@ public class CustomTThreadPoolServer extends TServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
final InetSocketAddress addr = args.addr;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java b/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
index d80d76e..ea74b94 100644
--- a/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/SSLTransportFactory.java
@@ -52,6 +52,7 @@ public class SSLTransportFactory implements ITransportFactory
private String[] cipherSuites;
@Override
+ @SuppressWarnings("resource")
public TTransport openTransport(String host, int port) throws Exception
{
TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(protocol, cipherSuites);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
index 63466b8..a430721 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
@@ -43,6 +43,7 @@ public class TCustomNonblockingServerSocket extends TNonblockingServerSocket
}
@Override
+ @SuppressWarnings("resource")
protected TNonblockingSocket acceptImpl() throws TTransportException
{
TNonblockingSocket tsocket = super.acceptImpl();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
index 477ef8c..8e27481 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
@@ -89,6 +89,7 @@ public class TCustomServerSocket extends TServerTransport
}
@Override
+ @SuppressWarnings("resource")
protected TCustomSocket acceptImpl() throws TTransportException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index a4c6bb7..7bf0b96 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -34,6 +34,7 @@ public class TFramedTransportFactory implements ITransportFactory
private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
private int thriftFramedTransportSizeMb = 15; // 15Mb is the default for C* & Hadoop ConfigHelper
+ @SuppressWarnings("resource")
public TTransport openTransport(String host, int port) throws TTransportException
{
TSocket socket = new TSocket(host, port);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
index dd501ec..37bc440 100644
--- a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
+++ b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
@@ -67,6 +67,7 @@ public class THsHaDisruptorServer extends TDisruptorServer
public static class Factory implements TServerFactory
{
+ @SuppressWarnings("resource")
public TServer buildTServer(Args args)
{
if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 61edad2..bc460a1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -225,8 +225,7 @@ public class SSTableExport
public static void enumeratekeys(Descriptor desc, PrintStream outs, CFMetaData metadata)
throws IOException
{
- KeyIterator iter = new KeyIterator(desc);
- try
+ try (KeyIterator iter = new KeyIterator(desc))
{
DecoratedKey lastKey = null;
while (iter.hasNext())
@@ -242,10 +241,6 @@ public class SSTableExport
checkStream(outs); // flushes
}
}
- finally
- {
- iter.close();
- }
}
/**
@@ -261,8 +256,8 @@ public class SSTableExport
public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes, CFMetaData metadata) throws IOException
{
SSTableReader sstable = SSTableReader.open(desc);
- RandomAccessReader dfile = sstable.openDataReader();
- try
+
+ try (RandomAccessReader dfile = sstable.openDataReader())
{
IPartitioner partitioner = sstable.partitioner;
@@ -305,10 +300,6 @@ public class SSTableExport
outs.println("\n]");
outs.flush();
}
- finally
- {
- dfile.close();
- }
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 7b187ac..2fda6bd 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -299,54 +299,58 @@ public class SSTableImport
int importedKeys = 0;
long start = System.nanoTime();
- JsonParser parser = getParser(jsonFile);
-
- Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
+ Object[] data;
+ try (JsonParser parser = getParser(jsonFile))
+ {
+ data = parser.readValueAs(new TypeReference<Object[]>(){});
+ }
keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
-
- System.out.printf("Importing %s keys...%n", keyCountToImport);
-
- // sort by dk representation, but hold onto the hex version
- SortedMap<DecoratedKey,Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey,Map<?, ?>>();
- for (Object row : data)
+ try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0))
{
- Map<?,?> rowAsMap = (Map<?, ?>)row;
- decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap);
- }
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
- for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet())
- {
- if (row.getValue().containsKey("metadata"))
+ // sort by dk representation, but hold onto the hex version
+ SortedMap<DecoratedKey, Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey, Map<?, ?>>();
+
+ for (Object row : data)
{
- parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null);
+ Map<?, ?> rowAsMap = (Map<?, ?>) row;
+ decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap);
}
- Object columns = row.getValue().get("cells");
- addColumnsToCF((List<?>) columns, columnFamily);
+ for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet())
+ {
+ if (row.getValue().containsKey("metadata"))
+ {
+ parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null);
+ }
+ Object columns = row.getValue().get("cells");
+ addColumnsToCF((List<?>) columns, columnFamily);
- writer.append(row.getKey(), columnFamily);
- columnFamily.clear();
- importedKeys++;
+ writer.append(row.getKey(), columnFamily);
+ columnFamily.clear();
+
+ importedKeys++;
- long current = System.nanoTime();
+ long current = System.nanoTime();
- if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
- {
- System.out.printf("Currently imported %d keys.%n", importedKeys);
- start = current;
+ if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
+ {
+ System.out.printf("Currently imported %d keys.%n", importedKeys);
+ start = current;
+ }
+
+ if (keyCountToImport == importedKeys)
+ break;
}
- if (keyCountToImport == importedKeys)
- break;
+ writer.finish(true);
}
- writer.finish(true);
-
return importedKeys;
}
@@ -356,28 +360,29 @@ public class SSTableImport
int importedKeys = 0; // already imported keys count
long start = System.nanoTime();
- JsonParser parser = getParser(jsonFile);
-
- if (keyCountToImport == null)
+ try (JsonParser parser = getParser(jsonFile))
{
- keyCountToImport = 0;
- System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
- parser.nextToken(); // START_ARRAY
- while (parser.nextToken() != null)
+ if (keyCountToImport == null)
{
- parser.skipChildren();
- if (parser.getCurrentToken() == JsonToken.END_ARRAY)
- break;
+ keyCountToImport = 0;
+ System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
- keyCountToImport++;
+ parser.nextToken(); // START_ARRAY
+ while (parser.nextToken() != null)
+ {
+ parser.skipChildren();
+ if (parser.getCurrentToken() == JsonToken.END_ARRAY)
+ break;
+
+ keyCountToImport++;
+ }
}
+ System.out.printf("Importing %s keys...%n", keyCountToImport);
}
- System.out.printf("Importing %s keys...%n", keyCountToImport);
-
- parser = getParser(jsonFile); // renewing parser
- try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
+ try (JsonParser parser = getParser(jsonFile); // renewing parser
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
{
int lineNumber = 1;
DecoratedKey prevStoredKey = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index d32ef88..dd513b8 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -119,8 +119,7 @@ public class StandaloneScrubber
{
try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
{
- Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate);
- try
+ try (Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate))
{
scrubber.scrub();
}
@@ -132,10 +131,6 @@ public class StandaloneScrubber
throw t;
}
}
- finally
- {
- scrubber.close();
- }
// Remove the sstable (it's been copied by scrub and snapshotted)
sstable.markObsolete();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index a4f3e80..f71f58d 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -100,8 +100,8 @@ public class StandaloneVerifier
{
try
{
- Verifier verifier = new Verifier(cfs, sstable, handler, true);
- try
+
+ try (Verifier verifier = new Verifier(cfs, sstable, handler, true))
{
verifier.verify(extended);
}
@@ -110,10 +110,6 @@ public class StandaloneVerifier
System.err.println(String.format("Error verifying %s: %s", sstable, cs.getMessage()));
hasFailed = true;
}
- finally
- {
- verifier.close();
- }
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
index 5fad3ea..6f80ac0 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
@@ -40,6 +40,7 @@ class BloomFilterSerializer implements ISerializer<BloomFilter>
return deserialize(in, false);
}
+ @SuppressWarnings("resource")
public BloomFilter deserialize(DataInput in, boolean offheap) throws IOException
{
int hashes = in.readInt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index c20e33e..17edeb0 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -347,10 +347,8 @@ public class FBUtilities
public static String getReleaseVersionString()
{
- InputStream in = null;
- try
+ try (InputStream in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties"))
{
- in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties");
if (in == null)
{
return System.getProperty("cassandra.releaseVersion", "Unknown");
@@ -365,10 +363,6 @@ public class FBUtilities
logger.warn("Unable to load version.properties", e);
return "debug version";
}
- finally
- {
- FileUtils.closeQuietly(in);
- }
}
public static long timestampMicros()
@@ -718,10 +712,10 @@ public class FBUtilities
public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version)
{
- try
+ int size = (int) serializer.serializedSize(object, version);
+
+ try (DataOutputBuffer buffer = new DataOutputBufferFixed(size))
{
- int size = (int) serializer.serializedSize(object, version);
- DataOutputBuffer buffer = new DataOutputBufferFixed(size);
serializer.serialize(object, buffer, version);
assert buffer.getLength() == size && buffer.getData().length == size
: String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 7cfc332..d77500c 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -78,6 +78,7 @@ public class FilterFactory
return createFilter(spec.K, numElements, spec.bucketsPerElement, offheap);
}
+ @SuppressWarnings("resource")
private static IFilter createFilter(int hash, long numElements, int bucketsPer, boolean offheap)
{
long numBits = (numElements * bucketsPer) + BITSET_EXCESS;