You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/18 14:20:00 UTC
[1/3] cassandra git commit: Refactor TransactionLog code and fix
order of cleanup bug on Windows
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 ff27eb304 -> c163d0bc3
refs/heads/trunk bcba9b7f6 -> aefb2a6ff
Refactor TransactionLog code and fix order of cleanup bug on Windows
patch by stefania; reviewed by benedict for CASSANDRA-10286
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c163d0bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c163d0bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c163d0bc
Branch: refs/heads/cassandra-3.0
Commit: c163d0bc365239f4960ab2e19fb72a0ff785afa8
Parents: ff27eb3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Sep 9 11:26:14 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 18 13:18:05 2015 +0100
----------------------------------------------------------------------
.../io/compress/CompressedSequentialWriter.java | 3 +--
.../io/compress/CompressionMetadata.java | 4 +--
.../cassandra/io/sstable/SSTableTxnWriter.java | 4 +--
.../io/sstable/format/big/BigTableWriter.java | 8 ++----
.../io/util/ChecksummedSequentialWriter.java | 5 +---
.../cassandra/io/util/SequentialWriter.java | 27 +-------------------
.../utils/concurrent/Transactional.java | 5 ++--
.../CompressedSequentialWriterTest.java | 1 -
.../cassandra/io/sstable/SSTableLoaderTest.java | 10 ++++++--
.../util/ChecksummedSequentialWriterTest.java | 1 -
.../cassandra/io/util/SequentialWriterTest.java | 1 -
11 files changed, 20 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 8e1ebff..bbec6f5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -263,7 +263,7 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
protected Throwable doCommit(Throwable accumulate)
{
- return metadataWriter.commit(accumulate);
+ return super.doCommit(metadataWriter.commit(accumulate));
}
@Override
@@ -278,7 +278,6 @@ public class CompressedSequentialWriter extends SequentialWriter
syncInternal();
if (descriptor != null)
crcMetadata.writeFullChecksum(descriptor);
- releaseFileHandle();
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 1681b0c..04ef2d3 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -410,7 +410,7 @@ public class CompressionMetadata
count = chunkIndex;
}
- protected Throwable doPreCleanup(Throwable failed)
+ protected Throwable doPostCleanup(Throwable failed)
{
return offsets.close(failed);
}
@@ -422,7 +422,7 @@ public class CompressionMetadata
protected Throwable doAbort(Throwable accumulate)
{
- return FileUtils.deleteWithConfirm(filePath, false, accumulate);
+ return accumulate;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 6e1ac38..5d65a30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -69,13 +69,13 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
protected Throwable doAbort(Throwable accumulate)
{
- return writer.abort(txn.abort(accumulate));
+ return txn.abort(writer.abort(accumulate));
}
protected void doPrepare()
{
- txn.prepareToCommit();
writer.prepareToCommit();
+ txn.prepareToCommit();
}
public Collection<SSTableReader> finish(boolean openResult)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 06dd508..d2500b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -81,10 +81,6 @@ public class BigTableWriter extends SSTableWriter
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
}
iwriter = new IndexWriter(keyCount, dataFile);
-
- // txnLogs will delete if safe to do so (early readers)
- iwriter.indexFile.deleteFile(false);
- dataFile.deleteFile(false);
}
public void mark()
@@ -322,7 +318,7 @@ public class BigTableWriter extends SSTableWriter
}
@Override
- protected Throwable doPreCleanup(Throwable accumulate)
+ protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = dbuilder.close(accumulate);
return accumulate;
@@ -485,7 +481,7 @@ public class BigTableWriter extends SSTableWriter
}
@Override
- protected Throwable doPreCleanup(Throwable accumulate)
+ protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = summary.close(accumulate);
accumulate = bf.close(accumulate);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 8203a37..fd88151 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -50,7 +50,7 @@ public class ChecksummedSequentialWriter extends SequentialWriter
@Override
protected Throwable doCommit(Throwable accumulate)
{
- return crcWriter.commit(accumulate);
+ return super.doCommit(crcWriter.commit(accumulate));
}
@Override
@@ -66,9 +66,6 @@ public class ChecksummedSequentialWriter extends SequentialWriter
if (descriptor != null)
crcMetadata.writeFullChecksum(descriptor);
crcWriter.setDescriptor(descriptor).prepareToCommit();
- // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
- // TODO: once we stop file renaming, remove this for clarity
- releaseFileHandle();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 6000f95..5bdc15a 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -68,8 +68,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
// due to lack of multiple-inheritance, we proxy our transactional implementation
protected class TransactionalProxy extends AbstractTransactional
{
- private boolean deleteFile = true;
-
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
@@ -90,9 +88,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
protected void doPrepare()
{
syncInternal();
- // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
- // TODO: once we stop file renaming, remove this for clarity
- releaseFileHandle();
}
protected Throwable doCommit(Throwable accumulate)
@@ -102,10 +97,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
protected Throwable doAbort(Throwable accumulate)
{
- if (deleteFile)
- return FileUtils.deleteWithConfirm(filePath, false, accumulate);
- else
- return accumulate;
+ return accumulate;
}
}
@@ -409,23 +401,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
return new TransactionalProxy();
}
- public void deleteFile(boolean val)
- {
- txnProxy.deleteFile = val;
- }
-
- public void releaseFileHandle()
- {
- try
- {
- channel.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
- }
-
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 02562ce..d142f06 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -88,7 +88,8 @@ public interface Transactional extends AutoCloseable
// Transactional objects will perform cleanup in the commit() or abort() calls
/**
- * perform an exception-safe pre-abort cleanup; this will still be run *after* commit
+ * perform an exception-safe pre-abort/commit cleanup;
+ * this will be run after prepareToCommit (so before commit), and before abort
*/
protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; }
@@ -113,7 +114,6 @@ public interface Transactional extends AutoCloseable
if (state != State.READY_TO_COMMIT)
throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state);
accumulate = doCommit(accumulate);
- accumulate = doPreCleanup(accumulate);
accumulate = doPostCleanup(accumulate);
state = State.COMMITTED;
return accumulate;
@@ -171,6 +171,7 @@ public interface Transactional extends AutoCloseable
throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
doPrepare();
+ maybeFail(doPreCleanup(null));
state = State.READY_TO_COMMIT;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 1bdc591..56c83da 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -222,7 +222,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
protected void assertAborted() throws Exception
{
super.assertAborted();
- Assert.assertFalse(offsetsFile.exists());
}
void cleanup()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index faa9c3e..ad7523d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -131,11 +131,14 @@ public class SSTableLoaderTest
writer.addRow("key1", "col1", "100");
}
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
final CountDownLatch latch = new CountDownLatch(1);
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
assertEquals(1, partitions.size());
assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
@@ -175,6 +178,9 @@ public class SSTableLoaderTest
writer.addRow(String.format("key%d", i), String.format("col%d", j), "100");
}
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
//make sure we have some tables...
assertTrue(dataDir.listFiles().length > 0);
@@ -183,7 +189,7 @@ public class SSTableLoaderTest
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
index 9731a8d..bea3aac 100644
--- a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -85,7 +85,6 @@ public class ChecksummedSequentialWriterTest extends SequentialWriterTest
protected void assertAborted() throws Exception
{
super.assertAborted();
- Assert.assertFalse(crcFile.exists());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index fd38427..f5a366e 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -102,7 +102,6 @@ public class SequentialWriterTest extends AbstractTransactionalTest
protected void assertAborted() throws Exception
{
Assert.assertFalse(writer.isOpen());
- Assert.assertFalse(file.exists());
}
protected void assertCommitted() throws Exception
[2/3] cassandra git commit: Refactor TransactionLog code and fix
order of cleanup bug on Windows
Posted by be...@apache.org.
Refactor TransactionLog code and fix order of cleanup bug on Windows
patch by stefania; reviewed by benedict for CASSANDRA-10286
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c163d0bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c163d0bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c163d0bc
Branch: refs/heads/trunk
Commit: c163d0bc365239f4960ab2e19fb72a0ff785afa8
Parents: ff27eb3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Sep 9 11:26:14 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 18 13:18:05 2015 +0100
----------------------------------------------------------------------
.../io/compress/CompressedSequentialWriter.java | 3 +--
.../io/compress/CompressionMetadata.java | 4 +--
.../cassandra/io/sstable/SSTableTxnWriter.java | 4 +--
.../io/sstable/format/big/BigTableWriter.java | 8 ++----
.../io/util/ChecksummedSequentialWriter.java | 5 +---
.../cassandra/io/util/SequentialWriter.java | 27 +-------------------
.../utils/concurrent/Transactional.java | 5 ++--
.../CompressedSequentialWriterTest.java | 1 -
.../cassandra/io/sstable/SSTableLoaderTest.java | 10 ++++++--
.../util/ChecksummedSequentialWriterTest.java | 1 -
.../cassandra/io/util/SequentialWriterTest.java | 1 -
11 files changed, 20 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 8e1ebff..bbec6f5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -263,7 +263,7 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
protected Throwable doCommit(Throwable accumulate)
{
- return metadataWriter.commit(accumulate);
+ return super.doCommit(metadataWriter.commit(accumulate));
}
@Override
@@ -278,7 +278,6 @@ public class CompressedSequentialWriter extends SequentialWriter
syncInternal();
if (descriptor != null)
crcMetadata.writeFullChecksum(descriptor);
- releaseFileHandle();
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 1681b0c..04ef2d3 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -410,7 +410,7 @@ public class CompressionMetadata
count = chunkIndex;
}
- protected Throwable doPreCleanup(Throwable failed)
+ protected Throwable doPostCleanup(Throwable failed)
{
return offsets.close(failed);
}
@@ -422,7 +422,7 @@ public class CompressionMetadata
protected Throwable doAbort(Throwable accumulate)
{
- return FileUtils.deleteWithConfirm(filePath, false, accumulate);
+ return accumulate;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 6e1ac38..5d65a30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -69,13 +69,13 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
protected Throwable doAbort(Throwable accumulate)
{
- return writer.abort(txn.abort(accumulate));
+ return txn.abort(writer.abort(accumulate));
}
protected void doPrepare()
{
- txn.prepareToCommit();
writer.prepareToCommit();
+ txn.prepareToCommit();
}
public Collection<SSTableReader> finish(boolean openResult)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 06dd508..d2500b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -81,10 +81,6 @@ public class BigTableWriter extends SSTableWriter
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
}
iwriter = new IndexWriter(keyCount, dataFile);
-
- // txnLogs will delete if safe to do so (early readers)
- iwriter.indexFile.deleteFile(false);
- dataFile.deleteFile(false);
}
public void mark()
@@ -322,7 +318,7 @@ public class BigTableWriter extends SSTableWriter
}
@Override
- protected Throwable doPreCleanup(Throwable accumulate)
+ protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = dbuilder.close(accumulate);
return accumulate;
@@ -485,7 +481,7 @@ public class BigTableWriter extends SSTableWriter
}
@Override
- protected Throwable doPreCleanup(Throwable accumulate)
+ protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = summary.close(accumulate);
accumulate = bf.close(accumulate);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 8203a37..fd88151 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -50,7 +50,7 @@ public class ChecksummedSequentialWriter extends SequentialWriter
@Override
protected Throwable doCommit(Throwable accumulate)
{
- return crcWriter.commit(accumulate);
+ return super.doCommit(crcWriter.commit(accumulate));
}
@Override
@@ -66,9 +66,6 @@ public class ChecksummedSequentialWriter extends SequentialWriter
if (descriptor != null)
crcMetadata.writeFullChecksum(descriptor);
crcWriter.setDescriptor(descriptor).prepareToCommit();
- // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
- // TODO: once we stop file renaming, remove this for clarity
- releaseFileHandle();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 6000f95..5bdc15a 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -68,8 +68,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
// due to lack of multiple-inheritance, we proxy our transactional implementation
protected class TransactionalProxy extends AbstractTransactional
{
- private boolean deleteFile = true;
-
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
@@ -90,9 +88,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
protected void doPrepare()
{
syncInternal();
- // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
- // TODO: once we stop file renaming, remove this for clarity
- releaseFileHandle();
}
protected Throwable doCommit(Throwable accumulate)
@@ -102,10 +97,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
protected Throwable doAbort(Throwable accumulate)
{
- if (deleteFile)
- return FileUtils.deleteWithConfirm(filePath, false, accumulate);
- else
- return accumulate;
+ return accumulate;
}
}
@@ -409,23 +401,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
return new TransactionalProxy();
}
- public void deleteFile(boolean val)
- {
- txnProxy.deleteFile = val;
- }
-
- public void releaseFileHandle()
- {
- try
- {
- channel.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
- }
-
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 02562ce..d142f06 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -88,7 +88,8 @@ public interface Transactional extends AutoCloseable
// Transactional objects will perform cleanup in the commit() or abort() calls
/**
- * perform an exception-safe pre-abort cleanup; this will still be run *after* commit
+ * perform an exception-safe pre-abort/commit cleanup;
+ * this will be run after prepareToCommit (so before commit), and before abort
*/
protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; }
@@ -113,7 +114,6 @@ public interface Transactional extends AutoCloseable
if (state != State.READY_TO_COMMIT)
throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state);
accumulate = doCommit(accumulate);
- accumulate = doPreCleanup(accumulate);
accumulate = doPostCleanup(accumulate);
state = State.COMMITTED;
return accumulate;
@@ -171,6 +171,7 @@ public interface Transactional extends AutoCloseable
throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
doPrepare();
+ maybeFail(doPreCleanup(null));
state = State.READY_TO_COMMIT;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 1bdc591..56c83da 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -222,7 +222,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
protected void assertAborted() throws Exception
{
super.assertAborted();
- Assert.assertFalse(offsetsFile.exists());
}
void cleanup()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index faa9c3e..ad7523d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -131,11 +131,14 @@ public class SSTableLoaderTest
writer.addRow("key1", "col1", "100");
}
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
final CountDownLatch latch = new CountDownLatch(1);
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
assertEquals(1, partitions.size());
assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
@@ -175,6 +178,9 @@ public class SSTableLoaderTest
writer.addRow(String.format("key%d", i), String.format("col%d", j), "100");
}
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
//make sure we have some tables...
assertTrue(dataDir.listFiles().length > 0);
@@ -183,7 +189,7 @@ public class SSTableLoaderTest
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
index 9731a8d..bea3aac 100644
--- a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -85,7 +85,6 @@ public class ChecksummedSequentialWriterTest extends SequentialWriterTest
protected void assertAborted() throws Exception
{
super.assertAborted();
- Assert.assertFalse(crcFile.exists());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index fd38427..f5a366e 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -102,7 +102,6 @@ public class SequentialWriterTest extends AbstractTransactionalTest
protected void assertAborted() throws Exception
{
Assert.assertFalse(writer.isOpen());
- Assert.assertFalse(file.exists());
}
protected void assertCommitted() throws Exception
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aefb2a6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aefb2a6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aefb2a6f
Branch: refs/heads/trunk
Commit: aefb2a6ff8ff279872837b0c6fd5eb00c19461f6
Parents: bcba9b7 c163d0b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 18 13:19:45 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 18 13:19:45 2015 +0100
----------------------------------------------------------------------
.../io/compress/CompressedSequentialWriter.java | 3 +--
.../io/compress/CompressionMetadata.java | 4 +--
.../cassandra/io/sstable/SSTableTxnWriter.java | 4 +--
.../io/sstable/format/big/BigTableWriter.java | 8 ++----
.../io/util/ChecksummedSequentialWriter.java | 5 +---
.../cassandra/io/util/SequentialWriter.java | 27 +-------------------
.../utils/concurrent/Transactional.java | 5 ++--
.../CompressedSequentialWriterTest.java | 1 -
.../cassandra/io/sstable/SSTableLoaderTest.java | 10 ++++++--
.../util/ChecksummedSequentialWriterTest.java | 1 -
.../cassandra/io/util/SequentialWriterTest.java | 1 -
11 files changed, 20 insertions(+), 49 deletions(-)
----------------------------------------------------------------------