You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Robert Stupp (JIRA)" <ji...@apache.org> on 2015/09/05 20:57:45 UTC
[jira] [Commented] (CASSANDRA-10237) CFS.loadNewSSTables() broken
for pre-3.0 sstables
[ https://issues.apache.org/jira/browse/CASSANDRA-10237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732071#comment-14732071 ]
Robert Stupp commented on CASSANDRA-10237:
------------------------------------------
This also breaks sstable verify (both standalone and via JMX/nodetool) by that the digest file isn't verified since the file existence check in {{org.apache.cassandra.db.compaction.Verifier#verify}} doesn't succeed. I.e. this results in no error, but the functionality is lost.
> CFS.loadNewSSTables() broken for pre-3.0 sstables
> -------------------------------------------------
>
> Key: CASSANDRA-10237
> URL: https://issues.apache.org/jira/browse/CASSANDRA-10237
> Project: Cassandra
> Issue Type: Bug
> Reporter: Robert Stupp
>
> While working on CASSANDRA-10236 I discovered that {{CFS.loadNewSSTables()}} doesn't work for pre-3.0 sstables - just for version {{ma}} sstables.
> TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new sstables during runtime doesn't.
> Issues with {{CFS.loadNewSSTables()}} discovered so far:
> # {{MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet)}} returns {{null}} for {{MetadataType.HEADER}} which results in a NPE later in {{MetadataSerializer.serialize}} executing {{Collections.sort}}.
> # After working around the previous issue, it turns out that it couldn't load the digest file, since {{Component.DIGEST}} is a singleton which refers to CRC32, but pre-3.0 sstables use Adler32.
> # After working around that one, it fails in {{StreamingHistogram$StreamingHistogramSerializer.deserialize}} as {{maxBinSize==Integer.MAX_VALUE}}.
> As loading legacy sstables works fine during startup, I assume my workarounds are not correct.
> For reference, [this commit|https://github.com/snazy/cassandra/commit/2f0668a1e1d8a101e8301b9c4211b164c113afaa] contains a ton of legacy sstables (simple, counter, clustered and clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to read these tables using {{CFS.loadNewSSTables()}}.
> {noformat:title=LegacySSTablesTest.txt}
> diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> index d2922cc..1be6450 100644
> --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> @@ -18,6 +18,9 @@
> package org.apache.cassandra.io.sstable;
>
> import java.io.File;
> +import java.io.FileInputStream;
> +import java.io.FileOutputStream;
> +import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.ArrayList;
> import java.util.HashSet;
> @@ -27,10 +30,15 @@ import java.util.Set;
> import org.junit.BeforeClass;
> import org.junit.Test;
>
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> import org.apache.cassandra.SchemaLoader;
> import org.apache.cassandra.Util;
> import org.apache.cassandra.config.CFMetaData;
> +import org.apache.cassandra.cql3.QueryProcessor;
> import org.apache.cassandra.db.ColumnFamilyStore;
> +import org.apache.cassandra.db.ConsistencyLevel;
> import org.apache.cassandra.db.DeletionTime;
> import org.apache.cassandra.db.Keyspace;
> import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
> @@ -43,6 +51,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
> import org.apache.cassandra.io.sstable.format.SSTableFormat;
> import org.apache.cassandra.io.sstable.format.SSTableReader;
> import org.apache.cassandra.io.sstable.format.Version;
> +import org.apache.cassandra.io.sstable.format.big.BigFormat;
> import org.apache.cassandra.schema.KeyspaceParams;
> import org.apache.cassandra.service.StorageService;
> import org.apache.cassandra.streaming.StreamPlan;
> @@ -57,6 +66,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
> */
> public class LegacySSTableTest
> {
> + private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
> +
> public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
> public static final String KSNAME = "Keyspace1";
> public static final String CFNAME = "Standard1";
> @@ -64,6 +75,8 @@ public class LegacySSTableTest
> public static Set<String> TEST_DATA;
> public static File LEGACY_SSTABLE_ROOT;
>
> + public static final String[] legacyVersions = {"jb", "ka", "la"};
> +
> @BeforeClass
> public static void defineSchema() throws ConfigurationException
> {
> @@ -208,4 +221,65 @@ public class LegacySSTableTest
> throw e;
> }
> }
> +
> + @Test
> + public void testLegacyCqlTables() throws Exception
> + {
> + QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
> +
> + loadLegacyTables();
> + }
> +
> + private void loadLegacyTables() throws IOException
> + {
> + for (String legacyVersion : legacyVersions)
> + {
> + logger.info("Preparing legacy version {}", legacyVersion);
> +
> + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
> + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
> + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
> + QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
> +
> + loadLegacyTable("legacy_%s_simple", legacyVersion);
> + loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
> + loadLegacyTable("legacy_%s_clust", legacyVersion);
> + loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
> +
> + }
> + }
> +
> + private void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
> + {
> + String table = String.format(tablePattern, legacyVersion);
> +
> + logger.info("Loading legacy table {}", table);
> +
> + ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table);
> +
> + for (File cfDir : cfs.getDirectories().getCFDirectories())
> + {
> + copySstables(legacyVersion, table, cfDir);
> + }
> +
> + cfs.loadNewSSTables();
> + }
> +
> + private static void copySstables(String legacyVersion, String table, File cfDir) throws IOException
> + {
> + byte[] buf = new byte[65536];
> +
> + for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
> + {
> + if (file.isFile())
> + {
> + File target = new File(cfDir, file.getName());
> + int rd;
> + FileInputStream is = new FileInputStream(file);
> + FileOutputStream os = new FileOutputStream(target);
> + while ((rd = is.read(buf)) >= 0)
> + os.write(buf, 0, rd);
> + }
> + }
> + }
> }
> {noformat}
> {noformat:title=broken-workaround}
> diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
> index 554c782..e953b1d 100644
> --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
> +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
> @@ -96,7 +96,7 @@ public class Verifier implements Closeable
> {
> validator = null;
>
> - if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
> + if (new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists())
> {
> validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
> validator.validate();
> diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
> index 54dd35b..d0405e4 100644
> --- a/src/java/org/apache/cassandra/io/sstable/Component.java
> +++ b/src/java/org/apache/cassandra/io/sstable/Component.java
> @@ -34,6 +34,7 @@ public class Component
> public static final char separator = '-';
>
> final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
> +
> public enum Type
> {
> // the base data for an sstable: the remaining components can be regenerated
> @@ -79,13 +80,17 @@ public class Component
> }
> }
>
> + private static final String DIGEST_CRC32_NAME = "Digest.crc32";
> + private static final String DIGEST_ADLER32_NAME = "Digest.adler32";
> +
> // singleton components for types that don't need ids
> public final static Component DATA = new Component(Type.DATA);
> public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
> public final static Component FILTER = new Component(Type.FILTER);
> public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
> public final static Component STATS = new Component(Type.STATS);
> - public final static Component DIGEST = new Component(Type.DIGEST);
> + public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, DIGEST_CRC32_NAME);
> + public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, DIGEST_ADLER32_NAME);
> public final static Component CRC = new Component(Type.CRC);
> public final static Component SUMMARY = new Component(Type.SUMMARY);
> public final static Component TOC = new Component(Type.TOC);
> @@ -138,11 +143,23 @@ public class Component
> case FILTER: component = Component.FILTER; break;
> case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break;
> case STATS: component = Component.STATS; break;
> - case DIGEST: component = Component.DIGEST; break;
> case CRC: component = Component.CRC; break;
> case SUMMARY: component = Component.SUMMARY; break;
> case TOC: component = Component.TOC; break;
> case CUSTOM: component = new Component(Type.CUSTOM, path.right); break;
> + case DIGEST:
> + switch (path.right)
> + {
> + case DIGEST_CRC32_NAME:
> + component = Component.DIGEST_CRC32;
> + break;
> + case DIGEST_ADLER32_NAME:
> + component = Component.DIGEST_ADLER32;
> + break;
> + default:
> + throw new IllegalStateException();
> + }
> + break;
> default:
> throw new IllegalStateException();
> }
> diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> index 38829df..0db6f00 100644
> --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
> import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
> import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
> import org.apache.cassandra.utils.Pair;
> +import org.apache.hadoop.mapred.JobTracker;
>
> import static org.apache.cassandra.io.sstable.Component.separator;
>
> @@ -344,4 +345,16 @@ public class Descriptor
> {
> return hashCode;
> }
> +
> + public Component digestComponent()
> + {
> + switch (version.compressedChecksumType())
> + {
> + case Adler32:
> + return Component.DIGEST_ADLER32;
> + case CRC32:
> + return Component.DIGEST_CRC32;
> + }
> + throw new IllegalStateException();
> + }
> }
> diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> index bd21536..74e4b56 100644
> --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> @@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
> Component.STATS,
> Component.SUMMARY,
> Component.TOC,
> - Component.DIGEST));
> + Component.DIGEST_CRC32));
>
> if (metadata.params.bloomFilterFpChance < 1.0)
> components.add(Component.FILTER);
> diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> index 9a5eae8..a40c37a 100644
> --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> @@ -122,7 +122,12 @@ public class MetadataSerializer implements IMetadataSerializer
> in.seek(offset);
> component = type.serializer.deserialize(descriptor.version, in);
> }
> - components.put(type, component);
> + if (component == null)
> + {
> + assert type != MetadataType.HEADER || !descriptor.version.storeRows();
> + }
> + else
> + components.put(type, component);
> }
> return components;
> }
> diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> index 70cd860..b88f4f2 100644
> --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> @@ -109,7 +109,7 @@ public class DataIntegrityMetadata
> {
> this.descriptor = descriptor;
> checksum = descriptor.version.uncompressedChecksumType().newInstance();
> - digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
> + digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(descriptor.digestComponent())));
> dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
> try
> {
> @@ -211,7 +211,7 @@ public class DataIntegrityMetadata
>
> public void writeFullChecksum(Descriptor descriptor)
> {
> - File outFile = new File(descriptor.filenameFor(Component.DIGEST));
> + File outFile = new File(descriptor.filenameFor(descriptor.digestComponent()));
> try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
> {
> out.write(String.valueOf(fullChecksum.getValue()));
> diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
> index 920eee0..1420cae 100644
> --- a/src/java/org/apache/cassandra/io/util/FileUtils.java
> +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
> @@ -173,7 +173,7 @@ public class FileUtils
>
> public static void renameWithConfirm(File from, File to)
> {
> - assert from.exists();
> + assert from.exists() : String.format("File to rename does not exist: %s", from.getPath());
> if (logger.isDebugEnabled())
> logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
> // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
> diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
> index 13ce0c1..0233169 100644
> --- a/test/unit/org/apache/cassandra/db/VerifyTest.java
> +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
> @@ -275,11 +275,11 @@ public class VerifyTest
> SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
>
>
> - RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw");
> + RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), "rw");
> Long correctChecksum = Long.parseLong(file.readLine());
> file.close();
>
> - writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST));
> + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
>
> try (Verifier verifier = new Verifier(cfs, sstable, false))
> {
> @@ -315,7 +315,7 @@ public class VerifyTest
> file.close();
>
> // Update the Digest to have the right Checksum
> - writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST));
> + writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
>
> try (Verifier verifier = new Verifier(cfs, sstable, false))
> {
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)