You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/17 17:07:03 UTC
[07/12] cassandra git commit: Support streaming of older version
sstables in 3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
new file mode 100644
index 0000000..a29a600
--- /dev/null
+++ b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
@@ -0,0 +1,8 @@
+Index.db
+Statistics.db
+Digest.crc32
+Summary.db
+Data.db
+TOC.txt
+Filter.db
+CompressionInfo.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 501f4ae..3a1f348 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.net.MessagingService;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 8732881..f864bbc 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -243,46 +243,76 @@ public class DirectoriesTest
for (CFMetaData cfm : CFM)
{
Directories directories = new Directories(cfm);
- Directories.SSTableLister lister;
- Set<File> listed;
+ checkFiles(cfm, directories);
+ }
+ }
- // List all but no snapshot, backup
- lister = directories.sstableLister(Directories.OnTxnErr.THROW);
- listed = new HashSet<>(lister.listFiles());
- for (File f : files.get(cfm.cfName))
- {
- if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
- assert !listed.contains(f) : f + " should not be listed";
- else
- assert listed.contains(f) : f + " is missing";
- }
+ private void checkFiles(CFMetaData cfm, Directories directories)
+ {
+ Directories.SSTableLister lister;
+ Set<File> listed;// List all but no snapshot, backup
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW);
+ listed = new HashSet<>(lister.listFiles());
+ for (File f : files.get(cfm.cfName))
+ {
+ if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
+ assertFalse(f + " should not be listed", listed.contains(f));
+ else
+ assertTrue(f + " is missing", listed.contains(f));
+ }
- // List all but including backup (but no snapshot)
- lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
- listed = new HashSet<>(lister.listFiles());
- for (File f : files.get(cfm.cfName))
- {
- if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR))
- assert !listed.contains(f) : f + " should not be listed";
- else
- assert listed.contains(f) : f + " is missing";
- }
+ // List all but including backup (but no snapshot)
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
+ listed = new HashSet<>(lister.listFiles());
+ for (File f : files.get(cfm.cfName))
+ {
+ if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR))
+ assertFalse(f + " should not be listed", listed.contains(f));
+ else
+ assertTrue(f + " is missing", listed.contains(f));
+ }
- // Skip temporary and compacted
- lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
- listed = new HashSet<>(lister.listFiles());
- for (File f : files.get(cfm.cfName))
- {
- if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
- assert !listed.contains(f) : f + " should not be listed";
- else if (f.getName().contains("tmp-"))
- assert !listed.contains(f) : f + " should not be listed";
- else
- assert listed.contains(f) : f + " is missing";
- }
+ // Skip temporary and compacted
+ lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+ listed = new HashSet<>(lister.listFiles());
+ for (File f : files.get(cfm.cfName))
+ {
+ if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
+ assertFalse(f + " should not be listed", listed.contains(f));
+ else if (f.getName().contains("tmp-"))
+ assertFalse(f + " should not be listed", listed.contains(f));
+ else
+ assertTrue(f + " is missing", listed.contains(f));
}
}
+ @Test
+ public void testTemporaryFile() throws IOException
+ {
+ for (CFMetaData cfm : CFM)
+ {
+ Directories directories = new Directories(cfm);
+
+ File tempDir = directories.getTemporaryWriteableDirectoryAsFile(10);
+ tempDir.mkdir();
+ File tempFile = new File(tempDir, "tempFile");
+ tempFile.createNewFile();
+
+ assertTrue(tempDir.exists());
+ assertTrue(tempFile.exists());
+
+ //make sure temp dir/file will not affect existing sstable listing
+ checkFiles(cfm, directories);
+
+ directories.removeTemporaryDirectories();
+
+ //make sure temp dir/file deletion will not affect existing sstable listing
+ checkFiles(cfm, directories);
+
+ assertFalse(tempDir.exists());
+ assertFalse(tempFile.exists());
+ }
+ }
@Test
public void testDiskFailurePolicy_best_effort()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 8b7ad1f..e50b461 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 9154d79..74b5c74 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.MmappedRegions;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -84,7 +84,7 @@ public class CompressedRandomAccessReaderTest
for (int i = 0; i < 20; i++)
writer.write("x".getBytes());
- FileMark mark = writer.mark();
+ DataPosition mark = writer.mark();
// write enough garbage to create new chunks:
for (int i = 0; i < 40; ++i)
writer.write("y".getBytes());
@@ -126,7 +126,7 @@ public class CompressedRandomAccessReaderTest
: SequentialWriter.open(f))
{
writer.write("The quick ".getBytes());
- FileMark mark = writer.mark();
+ DataPosition mark = writer.mark();
writer.write("blue fox jumps over the lazy dog".getBytes());
// write enough to be sure to change chunk
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 56c83da..9b09f0b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriterTest;
import org.apache.cassandra.schema.CompressionParams;
@@ -104,7 +104,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
dataPost.flip();
writer.write(dataPre);
- FileMark mark = writer.mark();
+ DataPosition mark = writer.mark();
// Write enough garbage to transition chunk
for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 00727b8..4b9a769 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -21,14 +21,13 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Random;
-import java.util.Set;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -37,16 +36,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -55,7 +48,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamPlan;
@@ -63,8 +55,6 @@ import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
/**
* Tests backwards compatibility for SSTables
*/
@@ -73,10 +63,7 @@ public class LegacySSTableTest
private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
- public static final String KSNAME = "Keyspace1";
- public static final String CFNAME = "Standard1";
- public static Set<String> TEST_DATA;
public static File LEGACY_SSTABLE_ROOT;
/**
@@ -104,58 +91,78 @@ public class LegacySSTableTest
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
-
- CFMetaData metadata = CFMetaData.Builder.createDense(KSNAME, CFNAME, false, false)
- .addPartitionKey("key", BytesType.instance)
- .addClusteringColumn("column", BytesType.instance)
- .addRegularColumn("value", BytesType.instance)
- .build();
-
- SchemaLoader.createKeyspace(KSNAME,
- KeyspaceParams.simple(1),
- metadata);
- beforeClass();
- }
-
- public static void beforeClass()
- {
+ StorageService.instance.initServer();
Keyspace.setInitialized();
+ createKeyspace();
+ for (String legacyVersion : legacyVersions)
+ {
+ createTables(legacyVersion);
+ }
String scp = System.getProperty(LEGACY_SSTABLE_PROP);
assert scp != null;
LEGACY_SSTABLE_ROOT = new File(scp).getAbsoluteFile();
assert LEGACY_SSTABLE_ROOT.isDirectory();
+ }
- TEST_DATA = new HashSet<String>();
- for (int i = 100; i < 1000; ++i)
- TEST_DATA.add(Integer.toString(i));
+ @After
+ public void tearDown()
+ {
+ for (String legacyVersion : legacyVersions)
+ {
+ truncateTables(legacyVersion);
+ }
}
/**
* Get a descriptor for the legacy sstable at the given version.
*/
- protected Descriptor getDescriptor(String ver)
+ protected Descriptor getDescriptor(String legacyVersion, String table)
{
- File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME);
- return new Descriptor(ver, directory, KSNAME, CFNAME, 0, SSTableFormat.Type.LEGACY);
+ return new Descriptor(legacyVersion, getTableDir(legacyVersion, table), "legacy_tables", table, 1,
+ BigFormat.instance.getVersion(legacyVersion).hasNewFileName()?
+ SSTableFormat.Type.BIG :SSTableFormat.Type.LEGACY);
}
@Test
- public void testStreaming() throws Throwable
+ public void testLoadLegacyCqlTables() throws Exception
{
- StorageService.instance.initServer();
+ for (String legacyVersion : legacyVersions)
+ {
+ logger.info("Loading legacy version: {}", legacyVersion);
+ loadLegacyTables(legacyVersion);
+ CacheService.instance.invalidateKeyCache();
+ long startCount = CacheService.instance.keyCache.size();
+ verifyReads(legacyVersion);
+ verifyCache(legacyVersion, startCount);
+ }
+ }
+
+ @Test
+ public void testStreamLegacyCqlTables() throws Exception
+ {
+ for (String legacyVersion : legacyVersions)
+ {
+ streamLegacyTables(legacyVersion);
+ verifyReads(legacyVersion);
+ }
+ }
- for (File version : LEGACY_SSTABLE_ROOT.listFiles())
+ private void streamLegacyTables(String legacyVersion) throws Exception
+ {
+ for (int compact = 0; compact <= 1; compact++)
{
- if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory())
- continue;
- if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatibleForStreaming())
- testStreaming(version.getName());
+ logger.info("Streaming legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+ streamLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
+ streamLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
+ streamLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
+ streamLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
}
}
- private void testStreaming(String version) throws Exception
+ private void streamLegacyTable(String tablePattern, String legacyVersion, String compactNameSuffix) throws Exception
{
- SSTableReader sstable = SSTableReader.open(getDescriptor(version));
+ String table = String.format(tablePattern, legacyVersion, compactNameSuffix);
+ SSTableReader sstable = SSTableReader.open(getDescriptor(legacyVersion, table));
IPartitioner p = sstable.getPartitioner();
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
@@ -166,94 +173,41 @@ public class LegacySSTableTest
sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
.execute().get();
-
- ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME);
- assert cfs.getLiveSSTables().size() == 1;
- sstable = cfs.getLiveSSTables().iterator().next();
- for (String keystring : TEST_DATA)
- {
- ByteBuffer key = bytes(keystring);
-
- SliceableUnfilteredRowIterator iter = sstable.iterator(Util.dk(key), ColumnFilter.selectionBuilder().add(cfs.metadata.getColumnDefinition(bytes("name"))).build(), false, false);
-
- // check not deleted (CASSANDRA-6527)
- assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE);
- assert iter.next().clustering().get(0).equals(key);
- }
- sstable.selfRef().release();
- }
-
- @Test
- public void testVersions() throws Throwable
- {
- boolean notSkipped = false;
-
- for (File version : LEGACY_SSTABLE_ROOT.listFiles())
- {
- if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory())
- continue;
- if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatible())
- {
- notSkipped = true;
- testVersion(version.getName());
- }
- }
-
- assert notSkipped;
}
- public void testVersion(String version) throws Throwable
+ private static void loadLegacyTables(String legacyVersion) throws Exception
{
- try
- {
- ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME);
-
-
- SSTableReader reader = SSTableReader.open(getDescriptor(version));
- for (String keystring : TEST_DATA)
- {
-
- ByteBuffer key = bytes(keystring);
-
- SliceableUnfilteredRowIterator iter = reader.iterator(Util.dk(key), ColumnFilter.selection(cfs.metadata.partitionColumns()), false, false);
-
- // check not deleted (CASSANDRA-6527)
- assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE);
- assert iter.next().clustering().get(0).equals(key);
- }
-
- // TODO actually test some reads
- }
- catch (Throwable e)
+ for (int compact = 0; compact <= 1; compact++)
{
- System.err.println("Failed to read " + version);
- throw e;
+ logger.info("Preparing legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+ loadLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
+ loadLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
+ loadLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
+ loadLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
}
}
- @Test
- public void testLegacyCqlTables() throws Exception
+ private static void verifyCache(String legacyVersion, long startCount) throws InterruptedException, java.util.concurrent.ExecutionException
{
- createKeyspace();
-
- loadLegacyTables();
+ //For https://issues.apache.org/jira/browse/CASSANDRA-10778
+ //Validate whether the key cache successfully saves in the presence of old keys as
+ //well as loads the correct number of keys
+ long endCount = CacheService.instance.keyCache.size();
+ Assert.assertTrue(endCount > startCount);
+ CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ CacheService.instance.invalidateKeyCache();
+ Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
+ CacheService.instance.keyCache.loadSaved();
+ if (BigFormat.instance.getVersion(legacyVersion).storeRows())
+ Assert.assertEquals(endCount, CacheService.instance.keyCache.size());
+ else
+ Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
}
- private static void loadLegacyTables() throws Exception
+ private static void verifyReads(String legacyVersion)
{
- for (String legacyVersion : legacyVersions)
+ for (int compact = 0; compact <= 1; compact++)
{
- logger.info("Preparing legacy version {}", legacyVersion);
-
- createTables(legacyVersion);
-
- loadLegacyTable("legacy_%s_simple", legacyVersion);
- loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
- loadLegacyTable("legacy_%s_clust", legacyVersion);
- loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
-
- CacheService.instance.invalidateKeyCache();
- long startCount = CacheService.instance.keyCache.size();
for (int ck = 0; ck < 50; ck++)
{
String ckValue = Integer.toString(ck) + longString;
@@ -265,58 +219,94 @@ public class LegacySSTableTest
UntypedResultSet rs;
if (ck == 0)
{
- rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple WHERE pk=?", legacyVersion), pkValue);
- Assert.assertNotNull(rs);
- Assert.assertEquals(1, rs.size());
- Assert.assertEquals("foo bar baz", rs.one().getString("val"));
- rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter WHERE pk=?", legacyVersion), pkValue);
- Assert.assertNotNull(rs);
- Assert.assertEquals(1, rs.size());
- Assert.assertEquals(1L, rs.one().getLong("val"));
+ readSimpleTable(legacyVersion, getCompactNameSuffix(compact), pkValue);
+ readSimpleCounterTable(legacyVersion, getCompactNameSuffix(compact), pkValue);
}
- rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue);
- assertLegacyClustRows(1, rs);
-
- String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString;
- String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString;
- rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion), pkValue, ckValue, ckValue2, ckValue3);
- assertLegacyClustRows(3, rs);
-
- rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue);
- Assert.assertNotNull(rs);
- Assert.assertEquals(1, rs.size());
- Assert.assertEquals(1L, rs.one().getLong("val"));
+ readClusteringTable(legacyVersion, getCompactNameSuffix(compact), ck, ckValue, pkValue);
+ readClusteringCounterTable(legacyVersion, getCompactNameSuffix(compact), ckValue, pkValue);
}
}
-
- //For https://issues.apache.org/jira/browse/CASSANDRA-10778
- //Validate whether the key cache successfully saves in the presence of old keys as
- //well as loads the correct number of keys
- long endCount = CacheService.instance.keyCache.size();
- Assert.assertTrue(endCount > startCount);
- CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
- CacheService.instance.invalidateKeyCache();
- Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
- CacheService.instance.keyCache.loadSaved();
- if (BigFormat.instance.getVersion(legacyVersion).storeRows())
- Assert.assertEquals(endCount, CacheService.instance.keyCache.size());
- else
- Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
}
}
- private void createKeyspace()
+ private static void readClusteringCounterTable(String legacyVersion, String compactSuffix, String ckValue, String pkValue)
+ {
+ logger.debug("Read legacy_{}_clust_counter{}", legacyVersion, compactSuffix);
+ UntypedResultSet rs;
+ rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
+ Assert.assertNotNull(rs);
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals(1L, rs.one().getLong("val"));
+ }
+
+ private static void readClusteringTable(String legacyVersion, String compactSuffix, int ck, String ckValue, String pkValue)
+ {
+ logger.debug("Read legacy_{}_clust{}", legacyVersion, compactSuffix);
+ UntypedResultSet rs;
+ rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
+ assertLegacyClustRows(1, rs);
+
+ String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString;
+ String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString;
+ rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion, compactSuffix), pkValue, ckValue, ckValue2, ckValue3);
+ assertLegacyClustRows(3, rs);
+ }
+
+ private static void readSimpleCounterTable(String legacyVersion, String compactSuffix, String pkValue)
+ {
+ logger.debug("Read legacy_{}_simple_counter{}", legacyVersion, compactSuffix);
+ UntypedResultSet rs;
+ rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
+ Assert.assertNotNull(rs);
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals(1L, rs.one().getLong("val"));
+ }
+
+ private static void readSimpleTable(String legacyVersion, String compactSuffix, String pkValue)
+ {
+ logger.debug("Read simple: legacy_{}_simple{}", legacyVersion, compactSuffix);
+ UntypedResultSet rs;
+ rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
+ Assert.assertNotNull(rs);
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("foo bar baz", rs.one().getString("val"));
+ }
+
+ private static void createKeyspace()
{
QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
}
private static void createTables(String legacyVersion)
{
- QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
- QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
- QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
- QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
+ for (int i=0; i<=1; i++)
+ {
+ String compactSuffix = getCompactNameSuffix(i);
+ String tableSuffix = i == 0? "" : " WITH COMPACT STORAGE";
+ QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple%s (pk text PRIMARY KEY, val text)%s", legacyVersion, compactSuffix, tableSuffix));
+ QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter%s (pk text PRIMARY KEY, val counter)%s", legacyVersion, compactSuffix, tableSuffix));
+ QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust%s (pk text, ck text, val text, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
+ QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter%s (pk text, ck text, val counter, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
+ }
+ }
+
+ private static String getCompactNameSuffix(int i)
+ {
+ return i == 0? "" : "_compact";
+ }
+
+ private static void truncateTables(String legacyVersion)
+ {
+ for (int compact = 0; compact <= 1; compact++)
+ {
+ QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact)));
+ QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact)));
+ QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact)));
+ QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact)));
+ }
+ CacheService.instance.invalidateCounterCache();
+ CacheService.instance.invalidateKeyCache();
}
private static void assertLegacyClustRows(int count, UntypedResultSet rs)
@@ -332,9 +322,9 @@ public class LegacySSTableTest
}
}
- private static void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
+ private static void loadLegacyTable(String tablePattern, String legacyVersion, String compactSuffix) throws IOException
{
- String table = String.format(tablePattern, legacyVersion);
+ String table = String.format(tablePattern, legacyVersion, compactSuffix);
logger.info("Loading legacy table {}", table);
@@ -349,7 +339,7 @@ public class LegacySSTableTest
}
/**
- * Generates sstables for 4 CQL tables (see {@link #createTables(String)}) in <i>current</i>
+ * Generates sstables for 8 CQL tables (see {@link #createTables(String)}) in <i>current</i>
* sstable format (version) into {@code test/data/legacy-sstables/VERSION}, where
* {@code VERSION} matches {@link Version#getVersion() BigFormat.latestVersion.getVersion()}.
* <p>
@@ -357,13 +347,10 @@ public class LegacySSTableTest
* during development. I.e. remove the {@code @Ignore} annotation temporarily.
* </p>
*/
- @Test
@Ignore
+ @Test
public void testGenerateSstables() throws Throwable
{
- createKeyspace();
- createTables(BigFormat.latestVersion.getVersion());
-
Random rand = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 128; i++)
@@ -372,25 +359,28 @@ public class LegacySSTableTest
}
String randomString = sb.toString();
- for (int pk = 0; pk < 5; pk++)
+ for (int compact = 0; compact <= 1; compact++)
{
- String valPk = Integer.toString(pk);
- QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple (pk, val) VALUES ('%s', '%s')",
- BigFormat.latestVersion, valPk, "foo bar baz"));
+ for (int pk = 0; pk < 5; pk++)
+ {
+ String valPk = Integer.toString(pk);
+ QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple%s (pk, val) VALUES ('%s', '%s')",
+ BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, "foo bar baz"));
- QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter SET val = val + 1 WHERE pk = '%s'",
- BigFormat.latestVersion, valPk));
+ QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter%s SET val = val + 1 WHERE pk = '%s'",
+ BigFormat.latestVersion, getCompactNameSuffix(compact), valPk));
- for (int ck = 0; ck < 50; ck++)
- {
- String valCk = Integer.toString(ck);
+ for (int ck = 0; ck < 50; ck++)
+ {
+ String valCk = Integer.toString(ck);
- QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust (pk, ck, val) VALUES ('%s', '%s', '%s')",
- BigFormat.latestVersion, valPk, valCk + longString, randomString));
+ QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust%s (pk, ck, val) VALUES ('%s', '%s', '%s')",
+ BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString, randomString));
- QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter SET val = val + 1 WHERE pk = '%s' AND ck='%s'",
- BigFormat.latestVersion, valPk, valCk + longString));
+ QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter%s SET val = val + 1 WHERE pk = '%s' AND ck='%s'",
+ BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString));
+ }
}
}
@@ -398,10 +388,13 @@ public class LegacySSTableTest
File ksDir = new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables", BigFormat.latestVersion));
ksDir.mkdirs();
- copySstablesFromTestData(String.format("legacy_%s_simple", BigFormat.latestVersion), ksDir);
- copySstablesFromTestData(String.format("legacy_%s_simple_counter", BigFormat.latestVersion), ksDir);
- copySstablesFromTestData(String.format("legacy_%s_clust", BigFormat.latestVersion), ksDir);
- copySstablesFromTestData(String.format("legacy_%s_clust_counter", BigFormat.latestVersion), ksDir);
+ for (int compact = 0; compact <= 1; compact++)
+ {
+ copySstablesFromTestData(String.format("legacy_%s_simple%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+ copySstablesFromTestData(String.format("legacy_%s_simple_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+ copySstablesFromTestData(String.format("legacy_%s_clust%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+ copySstablesFromTestData(String.format("legacy_%s_clust_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+ }
}
private void copySstablesFromTestData(String table, File ksDir) throws IOException
@@ -420,12 +413,17 @@ public class LegacySSTableTest
private static void copySstablesToTestData(String legacyVersion, String table, File cfDir) throws IOException
{
- for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
+ for (File file : getTableDir(legacyVersion, table).listFiles())
{
copyFile(cfDir, file);
}
}
+ private static File getTableDir(String legacyVersion, String table)
+ {
+ return new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table));
+ }
+
private static void copyFile(File cfDir, File file) throws IOException
{
byte[] buf = new byte[65536];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index c3a4539..360d262 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -406,7 +406,7 @@ public class BufferedRandomAccessFileTest
assert tmpFile.getPath().equals(r.getPath());
// Create a mark and move the rw there.
- final FileMark mark = r.mark();
+ final DataPosition mark = r.mark();
r.reset(mark);
// Expect this call to succeed.
@@ -457,7 +457,7 @@ public class BufferedRandomAccessFileTest
RandomAccessReader file = RandomAccessReader.open(channel);
file.seek(10);
- FileMark mark = file.mark();
+ DataPosition mark = file.mark();
file.seek(file.length());
assertTrue(file.isEOF());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
index f0d4383..e83c015 100644
--- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@ -356,7 +356,7 @@ public class RandomAccessReaderTest
assertFalse(reader.isEOF());
assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
- FileMark mark = reader.mark();
+ DataPosition mark = reader.mark();
assertEquals(0, reader.bytesPastMark());
assertEquals(0, reader.bytesPastMark(mark));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
new file mode 100644
index 0000000..175ab53
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RewindableDataInputStreamPlusTest
+{
+
+ private final int INITIAL_BUFFER_SIZE = 1;
+
+ private File file;
+
+ @Before
+ public void setup() throws Exception
+ {
+ this.file = new File(System.getProperty("java.io.tmpdir"), "subdir/test.buffer");
+ }
+
+ @Test
+ public void testMarkAndResetSimple() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ // boolean
+ out.writeBoolean(true);
+ // byte
+ out.writeByte(0x1);
+ // char
+ out.writeChar('a');
+ // short
+ out.writeShort(1);
+ // int
+ out.writeInt(1);
+ // long
+ out.writeLong(1L);
+ // float
+ out.writeFloat(1.0f);
+ // double
+ out.writeDouble(1.0d);
+
+ // String
+ out.writeUTF("abc");
+ testData = baos.toByteArray();
+ }
+
+ for (int memCapacity = 0; memCapacity <= 16; memCapacity++)
+ {
+ int diskCapacity = 16 - memCapacity;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memCapacity, file,
+ diskCapacity))
+ {
+ try {
+ //should mark before resetting
+ reader.reset(null);
+ fail("Should have thrown IOException");
+ } catch (IOException e) {}
+
+ assertTrue(reader.readBoolean());
+
+ reader.mark();
+
+ try {
+ //cannot mark already marked stream
+ reader.mark();
+ fail("Should have thrown IllegalStateException");
+ } catch (IllegalStateException e) {}
+
+ assertEquals(0x1, reader.readByte());
+ assertEquals('a', reader.readChar());
+ assertEquals(3, reader.bytesPastMark(null));
+ reader.reset(null);
+
+ try {
+ //cannot mark when reading from cache
+ reader.mark();
+ fail("Should have thrown IllegalStateException");
+ } catch (IllegalStateException e) {}
+
+ //read again previous sequence
+ assertEquals(0x1, reader.readByte());
+ assertEquals('a', reader.readChar());
+ //finish reading again previous sequence
+ assertEquals(1, reader.readShort());
+
+ reader.mark();
+ assertEquals(1, reader.readInt());
+ assertEquals(1L, reader.readLong());
+ assertEquals(1.0f, reader.readFloat(), 0);
+ assertEquals(16, reader.bytesPastMark(null));
+ reader.reset(null);
+
+ //read again previous sequence
+ assertEquals(1, reader.readInt());
+ assertEquals(1L, reader.readLong());
+ assertEquals(1.0f, reader.readFloat(), 0);
+ //finish reading again previous sequence
+
+ //mark again
+ reader.mark();
+ assertEquals(1.0d, reader.readDouble(), 0);
+ assertEquals(8, reader.bytesPastMark(null));
+ reader.reset(null);
+
+ //read again previous sequence
+ assertEquals(1.0d, reader.readDouble(), 0);
+ //finish reading again previous sequence
+
+ //mark and reset
+ reader.mark();
+ reader.reset(null);
+
+ assertEquals("abc", reader.readUTF());
+
+ //check max file size
+ assertEquals(diskCapacity, file.length());
+ }
+ assertFalse(file.exists());
+ }
+ }
+
+ @Test
+ public void testVeryLargeCapacity() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ out.writeUTF("abc");
+ testData = baos.toByteArray();
+ }
+
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, Integer.MAX_VALUE, file,
+ Integer.MAX_VALUE))
+ {
+ reader.mark();
+ assertEquals("abc", reader.readUTF());
+ reader.reset();
+ assertEquals("abc", reader.readUTF());
+ }
+ assertFalse(file.exists());
+
+
+ baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ out.writeBoolean(true);
+ out.writeBoolean(true);
+ testData = baos.toByteArray();
+ }
+ }
+
+ @Test
+ public void testMarkAndResetBigBuffer() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ // boolean
+ out.writeBoolean(true);
+ // byte
+ out.writeByte(0x1);
+ // char
+ out.writeChar('a');
+ // short
+ out.writeShort(1);
+ // int
+ out.writeInt(1);
+ // long
+ out.writeLong(1L);
+ // float
+ out.writeFloat(1.0f);
+ // double
+ out.writeDouble(1.0d);
+
+ // String
+ out.writeUTF("abc");
+ testData = baos.toByteArray();
+
+ // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 8 (long)
+ // + 4 (float) + 8 (double) + 5 bytes (utf string)
+ }
+
+ for (int memCapacity = 0; memCapacity <= 18; memCapacity++)
+ {
+ int diskCapacity = 18 - memCapacity;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memCapacity, file,
+ diskCapacity))
+ {
+ //read a big amount before resetting
+ reader.mark();
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ assertEquals('a', reader.readChar());
+ assertEquals(1, reader.readShort());
+ assertEquals(1, reader.readInt());
+ assertEquals(1L, reader.readLong());
+ reader.reset();
+
+ //read from buffer
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ assertEquals('a', reader.readChar());
+ assertEquals(1, reader.readShort());
+ assertEquals(1, reader.readInt());
+ assertEquals(1L, reader.readLong());
+
+ assertEquals(17, reader.available());
+
+ //mark again
+ reader.mark();
+ assertEquals(1.0f, reader.readFloat(), 0);
+ assertEquals(1.0d, reader.readDouble(), 0);
+ assertEquals("abc", reader.readUTF());
+ reader.reset();
+
+ assertEquals(17, reader.available());
+
+ assertEquals(1.0f, reader.readFloat(), 0);
+ assertEquals(1.0d, reader.readDouble(), 0);
+ assertEquals("abc", reader.readUTF());
+ }
+ assertFalse(file.exists());
+ }
+ }
+
+
+ @Test
+ public void testCircularSpillFile() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ // boolean
+ out.writeBoolean(true);
+ // byte
+ out.writeByte(0x1);
+ // char
+ out.writeChar('a');
+ // short
+ out.writeShort(1);
+ // int
+ out.writeInt(1);
+
+ // String
+ out.writeUTF("ab");
+ testData = baos.toByteArray();
+
+ // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 4 bytes (utf string)
+ }
+
+ //read at most 4 bytes multiple times (and then check file size)
+ int MEM_SIZE = 0;
+ int DISK_SIZE = 4;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, MEM_SIZE, file,
+ DISK_SIZE))
+ {
+ //read 2 bytes and reset
+ reader.mark();
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ assertEquals(2, reader.bytesPastMark(null));
+ reader.reset();
+
+ //read again previous sequence
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ //finish reading again previous sequence
+
+ //read 4 bytes and reset
+ reader.mark();
+ assertEquals('a', reader.readChar());
+ assertEquals(1, reader.readShort());
+ assertEquals(4, reader.bytesPastMark(null));
+ reader.reset();
+
+ //read again previous sequence
+ assertEquals('a', reader.readChar());
+ assertEquals(1, reader.readShort());
+ //finish reading again previous sequence
+
+ //read 4 bytes and reset
+ reader.mark();
+ assertEquals(1, reader.readInt());
+ assertEquals(4, reader.bytesPastMark(null));
+ reader.reset();
+
+ //read again previous sequence
+ assertEquals(1, reader.readInt());
+
+ //check max file size
+ assertEquals(DISK_SIZE, file.length());
+ }
+ assertFalse(file.exists());
+ }
+
+ @Test
+ public void testExhaustCapacity() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ // boolean
+ out.writeBoolean(true);
+ // byte
+ out.writeByte(0x1);
+ // char
+ out.writeChar('a');
+ // short
+ out.writeShort(1);
+ testData = baos.toByteArray();
+ }
+
+ //test capacity exhausted when reading more than 4 bytes
+ testCapacityExhausted(testData, 0, 2);
+ testCapacityExhausted(testData, 2, 0);
+ testCapacityExhausted(testData, 1, 1);
+ }
+
+ private void testCapacityExhausted(byte[] testData, int memSize, int diskSize) throws IOException
+ {
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memSize, file,
+ diskSize))
+ {
+ //read 2 bytes and reset
+ reader.mark();
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ assertEquals(2, reader.bytesPastMark(null));
+ reader.reset();
+
+ //read again previous sequence
+ assertTrue(reader.readBoolean());
+ assertEquals(0x1, reader.readByte());
+ //finish reading again previous sequence
+
+ reader.mark();
+ //read 3 bytes - OK
+ assertEquals('a', reader.readChar());
+ //read 1 more bytes - CAPACITY will exhaust when trying to reset :(
+ assertEquals(1, reader.readShort());
+
+ try
+ {
+ reader.reset();
+ fail("Should have thrown IOException");
+ }
+ catch (IOException e) {}
+
+ //check max file size
+ assertEquals(diskSize, file.length());
+ }
+ assertFalse(file.exists());
+ }
+
+ @Test
+ public void testMarkAndResetUnsignedRead() throws Exception
+ {
+ byte[] testData;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(baos))
+ {
+ // byte
+ out.writeByte(0x1);
+ // short
+ out.writeShort(2);
+ testData = baos.toByteArray();
+ }
+
+ for (int memCapacity = 0; memCapacity <= 1; memCapacity++)
+ {
+ int diskCapacity = 1 - memCapacity;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memCapacity, file,
+ diskCapacity))
+ {
+ reader.mark();
+ assertEquals(1, reader.readUnsignedByte());
+ reader.reset();
+ assertEquals(1, reader.readUnsignedByte());
+
+ //will read first byte of short 2
+ reader.mark();
+ assertEquals(0, reader.readUnsignedByte());
+ reader.reset();
+
+ assertEquals(2, reader.readUnsignedShort());
+
+ reader.mark();
+ reader.reset();
+ assertEquals(0, reader.available());
+ }
+ }
+ assertFalse(file.exists());
+ }
+
+ @Test
+ public void testMarkAndResetSkipBytes() throws Exception
+ {
+ String testStr = "1234567890";
+ byte[] testData = testStr.getBytes();
+
+ for (int memCapacity = 0; memCapacity <= 7; memCapacity++)
+ {
+ int diskCapacity = 7 - memCapacity;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memCapacity, file,
+ diskCapacity))
+ {
+ reader.mark();
+ // read first 5 bytes and rewind
+ byte[] out = new byte[5];
+ reader.readFully(out, 0, 5);
+ assertEquals("12345", new String(out));
+
+ // then skip 2 bytes (67)
+ reader.skipBytes(2);
+
+ assertEquals(7, reader.bytesPastMark(null));
+ reader.reset();
+
+ //now read part of the previously skipped bytes
+ out = new byte[5];
+ reader.readFully(out);
+ assertEquals("12345", new String(out));
+
+ //skip 3 bytes (2 from cache, 1 from stream)
+ reader.skip(3);
+
+ // mark and read 2 more bytes
+ reader.mark();
+ out = new byte[2];
+ reader.readFully(out);
+ assertEquals("90", new String(out));
+ assertEquals(0, reader.available());
+ reader.reset();
+
+ //reset and read only the next byte "9" in the third position
+ reader.readFully(out, 1, 1);
+ assertEquals("99", new String(out));
+
+ //now we read the remainder via readline
+ assertEquals(1, reader.available());
+ assertEquals("0", reader.readLine());
+
+ }
+ assertFalse(file.exists());
+ }
+ }
+
+ @Test
+ public void testMarkAndResetReadFully() throws Exception
+ {
+ String testStr = "1234567890";
+ byte[] testData = testStr.getBytes();
+
+ for (int memCapacity = 0; memCapacity <= 5; memCapacity++)
+ {
+ int diskCapacity = 5 - memCapacity;
+ try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+ INITIAL_BUFFER_SIZE, memCapacity, file,
+ diskCapacity))
+ {
+ reader.mark();
+ // read first 5 bytes and rewind
+ byte[] out = new byte[5];
+ reader.readFully(out, 0, 5);
+ assertEquals("12345", new String(out));
+ reader.reset();
+
+ // read half from cache, half from parent stream
+ out = new byte[7];
+ reader.readFully(out);
+ assertEquals("1234567", new String(out));
+
+ // mark and read 3 more bytes
+ reader.mark();
+ out = new byte[3];
+ reader.readFully(out);
+ assertEquals("890", new String(out));
+ assertEquals(0, reader.available());
+ reader.reset();
+
+ //reset and read only the next byte "8" in the third position
+ reader.readFully(out, 2, 1);
+ assertEquals("898", new String(out));
+
+ //now we read the remainder via readline
+ assertEquals(2, reader.available());
+ assertEquals("90", reader.readLine());
+ }
+ assertFalse(file.exists());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
index 221e55c..7693b45 100644
--- a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
+++ b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.cassandra.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -25,8 +29,10 @@ import java.io.DataOutputStream;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.apache.cassandra.io.util.BytesReadTracker;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.io.util.TrackedInputStream;
public class BytesReadTrackerTest
{
@@ -34,6 +40,33 @@ public class BytesReadTrackerTest
@Test
public void testBytesRead() throws Exception
{
+ internalTestBytesRead(true);
+ internalTestBytesRead(false);
+ }
+
+ @Test
+ public void testUnsignedRead() throws Exception
+ {
+ internalTestUnsignedRead(true);
+ internalTestUnsignedRead(false);
+ }
+
+ @Test
+ public void testSkipBytesAndReadFully() throws Exception
+ {
+ internalTestSkipBytesAndReadFully(true);
+ internalTestSkipBytesAndReadFully(false);
+ }
+
+ @Test
+ public void testReadLine() throws Exception
+ {
+ internalTestReadLine(true);
+ internalTestReadLine(false);
+ }
+
+ public void internalTestBytesRead(boolean inputStream) throws Exception
+ {
byte[] testData;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -66,45 +99,46 @@ public class BytesReadTrackerTest
out.close();
}
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
- BytesReadTracker tracker = new BytesReadTracker(in);
+ DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+ BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+ DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
try
{
// boolean = 1byte
- boolean bool = tracker.readBoolean();
+ boolean bool = reader.readBoolean();
assertTrue(bool);
assertEquals(1, tracker.getBytesRead());
// byte = 1byte
- byte b = tracker.readByte();
+ byte b = reader.readByte();
assertEquals(b, 0x1);
assertEquals(2, tracker.getBytesRead());
// char = 2byte
- char c = tracker.readChar();
+ char c = reader.readChar();
assertEquals('a', c);
assertEquals(4, tracker.getBytesRead());
// short = 2bytes
- short s = tracker.readShort();
+ short s = reader.readShort();
assertEquals(1, s);
assertEquals((short) 6, tracker.getBytesRead());
// int = 4bytes
- int i = tracker.readInt();
+ int i = reader.readInt();
assertEquals(1, i);
assertEquals(10, tracker.getBytesRead());
// long = 8bytes
- long l = tracker.readLong();
+ long l = reader.readLong();
assertEquals(1L, l);
assertEquals(18, tracker.getBytesRead());
// float = 4bytes
- float f = tracker.readFloat();
+ float f = reader.readFloat();
assertEquals(1.0f, f, 0);
assertEquals(22, tracker.getBytesRead());
// double = 8bytes
- double d = tracker.readDouble();
+ double d = reader.readDouble();
assertEquals(1.0d, d, 0);
assertEquals(30, tracker.getBytesRead());
// String("abc") = 2(string size) + 3 = 5 bytes
- String str = tracker.readUTF();
+ String str = reader.readUTF();
assertEquals("abc", str);
assertEquals(35, tracker.getBytesRead());
@@ -119,8 +153,7 @@ public class BytesReadTrackerTest
assertEquals(0, tracker.getBytesRead());
}
- @Test
- public void testUnsignedRead() throws Exception
+ public void internalTestUnsignedRead(boolean inputStream) throws Exception
{
byte[] testData;
@@ -139,17 +172,18 @@ public class BytesReadTrackerTest
out.close();
}
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
- BytesReadTracker tracker = new BytesReadTracker(in);
+ DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+ BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+ DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
try
{
// byte = 1byte
- int b = tracker.readUnsignedByte();
+ int b = reader.readUnsignedByte();
assertEquals(b, 1);
assertEquals(1, tracker.getBytesRead());
// short = 2bytes
- int s = tracker.readUnsignedShort();
+ int s = reader.readUnsignedShort();
assertEquals(1, s);
assertEquals(3, tracker.getBytesRead());
@@ -161,30 +195,30 @@ public class BytesReadTrackerTest
}
}
- @Test
- public void testSkipBytesAndReadFully() throws Exception
+ public void internalTestSkipBytesAndReadFully(boolean inputStream) throws Exception
{
String testStr = "1234567890";
byte[] testData = testStr.getBytes();
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
- BytesReadTracker tracker = new BytesReadTracker(in);
+ DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+ BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+ DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
try
{
// read first 5 bytes
byte[] out = new byte[5];
- tracker.readFully(out, 0, 5);
+ reader.readFully(out, 0, 5);
assertEquals("12345", new String(out));
assertEquals(5, tracker.getBytesRead());
// then skip 2 bytes
- tracker.skipBytes(2);
+ reader.skipBytes(2);
assertEquals(7, tracker.getBytesRead());
// and read the rest
out = new byte[3];
- tracker.readFully(out);
+ reader.readFully(out);
assertEquals("890", new String(out));
assertEquals(10, tracker.getBytesRead());
@@ -196,16 +230,24 @@ public class BytesReadTrackerTest
}
}
- @Test(expected = UnsupportedOperationException.class)
- public void testReadLine() throws Exception
+ public void internalTestReadLine(boolean inputStream) throws Exception
{
DataInputStream in = new DataInputStream(new ByteArrayInputStream("1".getBytes()));
- BytesReadTracker tracker = new BytesReadTracker(in);
+ BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+ DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
try
{
- // throws UnsupportedOperationException
- tracker.readLine();
+ String line = reader.readLine();
+ if (inputStream)
+ assertEquals(line, "1");
+ else
+ fail("Should have thrown UnsupportedOperationException");
+ }
+ catch (UnsupportedOperationException e)
+ {
+ if (inputStream)
+ fail("Should have not thrown UnsupportedOperationException");
}
finally
{