You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:06 UTC
[4/7] cassandra git commit: Introduce Transactional API for internal
state changes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 3104a96..d39da61 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.*;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
public class SSTableRewriterTest extends SchemaLoader
{
@@ -93,8 +94,8 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.forceBlockingFlush();
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -104,9 +105,9 @@ public class SSTableRewriterTest extends SchemaLoader
AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
writer.append(row);
}
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION);
}
- Collection<SSTableReader> newsstables = writer.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
SSTableDeletingTask.waitForDeletions();
validateCFS(cfs);
@@ -126,8 +127,8 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -137,9 +138,9 @@ public class SSTableRewriterTest extends SchemaLoader
AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
writer.append(row);
}
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
}
- Collection<SSTableReader> newsstables = writer.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
SSTableDeletingTask.waitForDeletions();
validateCFS(cfs);
@@ -159,9 +160,9 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
boolean checked = false;
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
ISSTableScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -191,10 +192,10 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
}
+ assertTrue(checked);
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
}
- assertTrue(checked);
- Collection<SSTableReader> newsstables = writer.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
SSTableDeletingTask.waitForDeletions();
validateCFS(cfs);
@@ -216,17 +217,17 @@ public class SSTableRewriterTest extends SchemaLoader
for (int i = 0; i < 100; i++)
cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
File dir = cfs.directories.getDirectoryForNewSSTables();
- SSTableWriter writer = getWriter(cfs, dir);
- try
+
+ try (SSTableWriter writer = getWriter(cfs, dir);)
{
for (int i = 0; i < 10000; i++)
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
- SSTableReader s = writer.openEarly(1000);
+ SSTableReader s = writer.setMaxDataAge(1000).openEarly();
assert s != null;
assertFileCounts(dir.list(), 2, 2);
for (int i = 10000; i < 20000; i++)
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
- SSTableReader s2 = writer.openEarly(1000);
+ SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
assertTrue(s.last.compareTo(s2.last) < 0);
assertFileCounts(dir.list(), 2, 2);
s.markObsolete();
@@ -245,11 +246,6 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(datafiles, 0);
validateCFS(cfs);
}
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
}
@Test
@@ -264,13 +260,14 @@ public class SSTableRewriterTest extends SchemaLoader
long startStorageMetricsLoad = StorageMetrics.load.getCount();
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -284,9 +281,10 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
+ sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ assertEquals(files, sstables.size());
}
- List<SSTableReader> sstables = rewriter.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
long sum = 0;
for (SSTableReader x : cfs.getSSTables())
sum += x.bytesOnDisk();
@@ -314,13 +312,14 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -331,14 +330,9 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- }
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
+ sstables = rewriter.finish();
}
- List<SSTableReader> sstables = rewriter.finish();
assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
@@ -445,19 +439,14 @@ public class SSTableRewriterTest extends SchemaLoader
long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
test.run(scanner, controller, s, cfs, rewriter);
}
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
- }
SSTableDeletingTask.waitForDeletions();
@@ -481,13 +470,13 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
int files = 1;
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -506,11 +495,6 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
}
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
- }
SSTableDeletingTask.waitForDeletions();
@@ -531,13 +515,14 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -548,15 +533,11 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- }
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
+
+ sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
}
- List<SSTableReader> sstables = rewriter.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
SSTableDeletingTask.waitForDeletions();
assertFileCounts(s.descriptor.directory.list(), 0, 0);
cfs.truncateBlocking();
@@ -576,13 +557,14 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ List<SSTableReader> sstables;
int files = 1;
- try (ISSTableScanner scanner = s.getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while(scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -593,15 +575,11 @@ public class SSTableRewriterTest extends SchemaLoader
files++;
}
}
- }
- catch (Throwable t)
- {
- rewriter.abort();
- throw t;
+
+ sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
}
- List<SSTableReader> sstables = rewriter.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
SSTableDeletingTask.waitForDeletions();
@@ -668,12 +646,12 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
cfs.getDataTracker().markCompacting(compacting);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
- SSTableWriter w = getWriter(cfs, s.descriptor.directory);
- rewriter.switchWriter(w);
- try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
+ ISSTableScanner scanner = compacting.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while (scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -684,7 +662,8 @@ public class SSTableRewriterTest extends SchemaLoader
}
try
{
- rewriter.finishAndThrow(earlyException);
+ rewriter.throwDuringPrepare(earlyException);
+ rewriter.prepareToCommit();
}
catch (Throwable t)
{
@@ -749,14 +728,14 @@ public class SSTableRewriterTest extends SchemaLoader
compacting.add(s);
cfs.getDataTracker().markCompacting(compacting);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+
SSTableRewriter.overrideOpenInterval(1);
- SSTableWriter w = getWriter(cfs, s.descriptor.directory);
- rewriter.switchWriter(w);
int keyCount = 0;
- try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+ try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ ISSTableScanner scanner = compacting.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0))
{
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
while (scanner.hasNext())
{
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -841,11 +820,12 @@ public class SSTableRewriterTest extends SchemaLoader
File dir = cfs.directories.getDirectoryForNewSSTables();
String filename = cfs.getTempSSTablePath(dir);
- SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
-
- for (int i = 0; i < count * 5; i++)
- writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
- return writer.closeAndOpenReader();
+ try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);)
+ {
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.finish(true);
+ }
}
private void validateCFS(ColumnFamilyStore cfs)
@@ -899,7 +879,7 @@ public class SSTableRewriterTest extends SchemaLoader
return SSTableWriter.create(filename, 0, 0);
}
- private ByteBuffer random(int i, int size)
+ public static ByteBuffer random(int i, int size)
{
byte[] bytes = new byte[size + 4];
ThreadLocalRandom.current().nextBytes(bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index d2559cd..a116b84 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -217,7 +217,7 @@ public class SSTableUtils
File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
while (appender.append(writer)) { /* pass */ }
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// mark all components for removal
if (cleanup)
for (Component component : reader.components)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index a40034d..0ff4b01 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -128,7 +128,7 @@ public class BufferedRandomAccessFileTest
assert data[i] == 0;
}
- w.close();
+ w.finish();
r.close();
}
@@ -153,7 +153,7 @@ public class BufferedRandomAccessFileTest
assert negone == -1 : "We read past the end of the file, should have gotten EOF -1. Instead, " + negone;
r.close();
- w.close();
+ w.finish();
}
@Test
@@ -178,7 +178,7 @@ public class BufferedRandomAccessFileTest
w.write(biggerThenBuffer);
assertEquals(biggerThenBuffer.length + lessThenBuffer.length, w.length());
- w.close();
+ w.finish();
// will use cachedlength
RandomAccessReader r = RandomAccessReader.open(tmpFile);
@@ -223,7 +223,7 @@ public class BufferedRandomAccessFileTest
}
});
- w.close();
+ w.finish();
r.close();
}
@@ -233,7 +233,7 @@ public class BufferedRandomAccessFileTest
SequentialWriter w = createTempFile("brafSeek");
byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
w.write(data);
- w.close();
+ w.finish();
final RandomAccessReader file = RandomAccessReader.open(w);
@@ -272,7 +272,7 @@ public class BufferedRandomAccessFileTest
{
SequentialWriter w = createTempFile("brafSkipBytes");
w.write(generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2));
- w.close();
+ w.finish();
RandomAccessReader file = RandomAccessReader.open(w);
@@ -320,7 +320,7 @@ public class BufferedRandomAccessFileTest
r.read(new byte[4]);
assertEquals(r.getFilePointer(), 20);
- w.close();
+ w.finish();
r.close();
}
@@ -329,7 +329,7 @@ public class BufferedRandomAccessFileTest
{
SequentialWriter file = createTempFile("brafGetPath");
assert file.getPath().contains("brafGetPath");
- file.close();
+ file.finish();
}
@Test
@@ -411,7 +411,7 @@ public class BufferedRandomAccessFileTest
r.skipBytes(10);
assertEquals(r.bytesRemaining(), r.length() - 10);
- w.close();
+ w.finish();
r.close();
}
@@ -443,7 +443,7 @@ public class BufferedRandomAccessFileTest
byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
w.write(data);
- w.close(); // will flush
+ w.finish();
final RandomAccessReader r = RandomAccessReader.open(new File(w.getPath()));
@@ -481,7 +481,7 @@ public class BufferedRandomAccessFileTest
SequentialWriter w = createTempFile("brafTestMark");
w.write(new byte[30]);
- w.close();
+ w.finish();
RandomAccessReader file = RandomAccessReader.open(w);
@@ -542,10 +542,10 @@ public class BufferedRandomAccessFileTest
SequentialWriter w2 = createTempFile("fscache2");
w1.write(new byte[30]);
- w1.close();
+ w1.finish();
w2.write(new byte[30]);
- w2.close();
+ w2.finish();
for (int i = 0; i < 20; i++)
{
@@ -652,7 +652,7 @@ public class BufferedRandomAccessFileTest
assertEquals(new String(content), "cccccccccc");
- file.close();
+ file.finish();
copy.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
new file mode 100644
index 0000000..9731a8d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+
+import junit.framework.Assert;
+
+public class ChecksummedSequentialWriterTest extends SequentialWriterTest
+{
+
+ private final List<TestableCSW> writers = new ArrayList<>();
+
+ @After
+ public void cleanup()
+ {
+ for (TestableSW sw : writers)
+ sw.file.delete();
+ writers.clear();
+ }
+
+ protected TestableTransaction newTest() throws IOException
+ {
+ TestableCSW sw = new TestableCSW();
+ writers.add(sw);
+ return sw;
+ }
+
+ private static class TestableCSW extends TestableSW
+ {
+ final File crcFile;
+
+ private TestableCSW() throws IOException
+ {
+ this(tempFile("compressedsequentialwriter"),
+ tempFile("compressedsequentialwriter.checksum"));
+ }
+
+ private TestableCSW(File file, File crcFile) throws IOException
+ {
+ this(file, crcFile, new ChecksummedSequentialWriter(file, BUFFER_SIZE, crcFile));
+ }
+
+ private TestableCSW(File file, File crcFile, SequentialWriter sw) throws IOException
+ {
+ super(file, sw);
+ this.crcFile = crcFile;
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ super.assertInProgress();
+ Assert.assertTrue(crcFile.exists());
+ Assert.assertEquals(0, crcFile.length());
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ super.assertPrepared();
+ Assert.assertTrue(crcFile.exists());
+ Assert.assertFalse(0 == crcFile.length());
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ super.assertAborted();
+ Assert.assertFalse(crcFile.exists());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index b15da47..ec280fa 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -180,6 +180,7 @@ public class DataOutputTest
DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
DataInput canon = testWrite(write);
write.flush();
+ writer.finish();
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
testRead(test, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
new file mode 100644
index 0000000..ef52030
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -0,0 +1,117 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.After;
+
+import junit.framework.Assert;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+
+import static org.apache.commons.io.FileUtils.*;
+
+public class SequentialWriterTest extends AbstractTransactionalTest
+{
+
+ private final List<TestableSW> writers = new ArrayList<>();
+
+ @After
+ public void cleanup()
+ {
+ for (TestableSW sw : writers)
+ sw.file.delete();
+ writers.clear();
+ }
+
+ protected TestableTransaction newTest() throws IOException
+ {
+ TestableSW sw = new TestableSW();
+ writers.add(sw);
+ return sw;
+ }
+
+ protected static class TestableSW extends TestableTransaction
+ {
+ protected static final int BUFFER_SIZE = 8 << 10;
+ protected final File file;
+ protected final SequentialWriter writer;
+ protected final byte[] fullContents, partialContents;
+
+ protected TestableSW() throws IOException
+ {
+ this(tempFile("sequentialwriter"));
+ }
+
+ protected TestableSW(File file) throws IOException
+ {
+ this(file, new SequentialWriter(file, 8 << 10, true));
+ }
+
+ protected TestableSW(File file, SequentialWriter sw) throws IOException
+ {
+ super(sw);
+ this.file = file;
+ this.writer = sw;
+ fullContents = new byte[BUFFER_SIZE + BUFFER_SIZE / 2];
+ ThreadLocalRandom.current().nextBytes(fullContents);
+ partialContents = Arrays.copyOf(fullContents, BUFFER_SIZE);
+ sw.write(fullContents);
+ }
+
+ protected void assertInProgress() throws Exception
+ {
+ Assert.assertTrue(file.exists());
+ byte[] bytes = readFileToByteArray(file);
+ Assert.assertTrue(Arrays.equals(partialContents, bytes));
+ }
+
+ protected void assertPrepared() throws Exception
+ {
+ Assert.assertTrue(file.exists());
+ byte[] bytes = readFileToByteArray(file);
+ Assert.assertTrue(Arrays.equals(fullContents, bytes));
+ }
+
+ protected void assertAborted() throws Exception
+ {
+ Assert.assertFalse(writer.isOpen());
+ Assert.assertFalse(file.exists());
+ }
+
+ protected void assertCommitted() throws Exception
+ {
+ assertPrepared();
+ Assert.assertFalse(writer.isOpen());
+ }
+
+ protected static File tempFile(String prefix)
+ {
+ File file = FileUtils.createTempFile(prefix, "test");
+ file.delete();
+ return file;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 016deb3..20371c3 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -76,7 +76,7 @@ public class CompressedInputStreamTest
index.put(l, writer.getFilePointer());
writer.stream.writeLong(l);
}
- writer.close();
+ writer.finish();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 7e59207..bc73c83 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -109,7 +109,7 @@ public class SSTableExportTest
writer.append(Util.dk("rowB"), cfamily);
cfamily.clear();
- writer.closeAndOpenReader();
+ writer.finish(true);
// Enumerate and verify
File temp = File.createTempFile("Standard1", ".txt");
@@ -153,7 +153,7 @@ public class SSTableExportTest
writer.append(Util.dk("rowExclude"), cfamily);
cfamily.clear();
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("Standard1", ".json");
@@ -202,7 +202,7 @@ public class SSTableExportTest
writer.append(Util.dk("rowExclude"), cfamily);
cfamily.clear();
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("Standard1", ".json");
@@ -237,7 +237,7 @@ public class SSTableExportTest
writer.append(Util.dk("rowA"), cfamily);
cfamily.clear();
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("Counter1", ".json");
@@ -268,7 +268,7 @@ public class SSTableExportTest
writer.append(Util.dk("rowA"), cfamily);
cfamily.clear();
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("ValuesWithQuotes", ".json");
@@ -300,7 +300,7 @@ public class SSTableExportTest
cfamily.delete(new DeletionInfo(0, 0));
writer.append(Util.dk("rowA"), cfamily);
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("CFWithDeletionInfo", ".json");
SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new String[0]);
@@ -359,7 +359,7 @@ public class SSTableExportTest
cfamily.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L));
writer.append(Util.dk(ByteBufferUtil.bytes(UUIDGen.getTimeUUID())), cfamily);
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("CFWithColumnNameEqualToDefaultKeyAlias", ".json");
SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new String[0]);
@@ -388,7 +388,7 @@ public class SSTableExportTest
cfamily.addColumn(column("column", "value", 1L));
writer.append(Util.dk("key", AsciiType.instance), cfamily);
- SSTableReader reader = writer.closeAndOpenReader();
+ SSTableReader reader = writer.finish(true);
// Export to JSON and verify
File tempJson = File.createTempFile("CFWithAsciiKeys", ".json");
SSTableExport.export(reader,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
new file mode 100644
index 0000000..4e160c2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+@Ignore
+public abstract class AbstractTransactionalTest
+{
+
+ protected abstract TestableTransaction newTest() throws Exception;
+
+ @Test
+ public void testNoPrepare() throws Exception
+ {
+ TestableTransaction txn;
+
+ txn = newTest();
+ txn.assertInProgress();
+ txn.testing.close();
+ txn.assertAborted();
+
+ txn = newTest();
+ txn.assertInProgress();
+ Assert.assertNull(txn.testing.abort(null));
+ txn.assertAborted();
+ }
+
+ @Test
+ public void testPrepare() throws Exception
+ {
+ TestableTransaction txn;
+ txn = newTest();
+ txn.assertInProgress();
+ txn.testing.prepareToCommit();
+ txn.assertPrepared();
+ txn.testing.close();
+ txn.assertAborted();
+
+ txn = newTest();
+ txn.assertInProgress();
+ txn.testing.prepareToCommit();
+ txn.assertPrepared();
+ Assert.assertNull(txn.testing.abort(null));
+ txn.assertAborted();
+ }
+
+ @Test
+ public void testCommit() throws Exception
+ {
+ TestableTransaction txn = newTest();
+ txn.assertInProgress();
+ txn.testing.prepareToCommit();
+ txn.assertPrepared();
+ Assert.assertNull(txn.testing.commit(null));
+ txn.assertCommitted();
+ txn.testing.close();
+ txn.assertCommitted();
+ Throwable t = txn.testing.abort(null);
+ Assert.assertTrue(t instanceof IllegalStateException);
+ txn.assertCommitted();
+ }
+
+ @Test
+ public void testThrowableReturn() throws Exception
+ {
+ TestableTransaction txn;
+ txn = newTest();
+ Throwable t = new RuntimeException();
+ txn.testing.prepareToCommit();
+ Assert.assertEquals(t, txn.testing.commit(t));
+ Assert.assertEquals(t, txn.testing.abort(t));
+ Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException);
+ }
+
+ @Test
+ public void testBadCommit() throws Exception
+ {
+ TestableTransaction txn;
+ txn = newTest();
+ try
+ {
+ txn.testing.commit(null);
+ Assert.assertTrue(false);
+ }
+ catch (IllegalStateException t)
+ {
+ }
+ txn.assertInProgress();
+ Assert.assertNull(txn.testing.abort(null));
+ txn.assertAborted();
+ try
+ {
+ txn.testing.commit(null);
+ Assert.assertTrue(false);
+ }
+ catch (IllegalStateException t)
+ {
+ }
+ txn.assertAborted();
+ }
+
+
+ public static abstract class TestableTransaction
+ {
+ final Transactional testing;
+ public TestableTransaction(Transactional transactional)
+ {
+ this.testing = transactional;
+ }
+
+ protected abstract void assertInProgress() throws Exception;
+ protected abstract void assertPrepared() throws Exception;
+ protected abstract void assertAborted() throws Exception;
+ protected abstract void assertCommitted() throws Exception;
+ }
+}