You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Robert Stupp (JIRA)" <ji...@apache.org> on 2015/09/05 20:57:45 UTC

[jira] [Commented] (CASSANDRA-10237) CFS.loadNewSSTables() broken for pre-3.0 sstables

    [ https://issues.apache.org/jira/browse/CASSANDRA-10237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732071#comment-14732071 ] 

Robert Stupp commented on CASSANDRA-10237:
------------------------------------------

This also breaks sstable verify (both standalone and via JMX/nodetool) by that the digest file isn't verified since the file existence check in {{org.apache.cassandra.db.compaction.Verifier#verify}} doesn't succeed. I.e. this results in no error, but the functionality is lost.

> CFS.loadNewSSTables() broken for pre-3.0 sstables
> -------------------------------------------------
>
>                 Key: CASSANDRA-10237
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-10237
>             Project: Cassandra
>          Issue Type: Bug
>            Reporter: Robert Stupp
>
> While working on CASSANDRA-10236 I discovered that {{CFS.loadNewSSTables()}} doesn't work for pre-3.0 sstables - just for version {{ma}} sstables.
> TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new sstables during runtime doesn't.
> Issues with {{CFS.loadNewSSTables()}} discovered so far:
> # {{MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet)}} returns {{null}} for {{MetadataType.HEADER}} which results in a NPE later in {{MetadataSerializer.serialize}} executing {{Collections.sort}}.
> # After working around the previous issue, it turns out that it couldn't load the digest file, since {{Component.DIGEST}} is a singleton which refers to CRC32, but pre-3.0 sstables use Adler32.
> # After working around that one, it fails in {{StreamingHistogram$StreamingHistogramSerializer.deserialize}} as {{maxBinSize==Integer.MAX_VALUE}}.
> As loading legacy sstables works fine during startup, I assume my workarounds are not correct.
> For reference, [this commit|https://github.com/snazy/cassandra/commit/2f0668a1e1d8a101e8301b9c4211b164c113afaa] contains a ton of legacy sstables (simple, counter, clustered and clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to read these tables using {{CFS.loadNewSSTables()}}.
> {noformat:title=LegacySSTablesTest.txt}
> diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> index d2922cc..1be6450 100644
> --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
> @@ -18,6 +18,9 @@
>  package org.apache.cassandra.io.sstable;
>  
>  import java.io.File;
> +import java.io.FileInputStream;
> +import java.io.FileOutputStream;
> +import java.io.IOException;
>  import java.nio.ByteBuffer;
>  import java.util.ArrayList;
>  import java.util.HashSet;
> @@ -27,10 +30,15 @@ import java.util.Set;
>  import org.junit.BeforeClass;
>  import org.junit.Test;
>  
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
>  import org.apache.cassandra.SchemaLoader;
>  import org.apache.cassandra.Util;
>  import org.apache.cassandra.config.CFMetaData;
> +import org.apache.cassandra.cql3.QueryProcessor;
>  import org.apache.cassandra.db.ColumnFamilyStore;
> +import org.apache.cassandra.db.ConsistencyLevel;
>  import org.apache.cassandra.db.DeletionTime;
>  import org.apache.cassandra.db.Keyspace;
>  import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
> @@ -43,6 +51,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
>  import org.apache.cassandra.io.sstable.format.SSTableFormat;
>  import org.apache.cassandra.io.sstable.format.SSTableReader;
>  import org.apache.cassandra.io.sstable.format.Version;
> +import org.apache.cassandra.io.sstable.format.big.BigFormat;
>  import org.apache.cassandra.schema.KeyspaceParams;
>  import org.apache.cassandra.service.StorageService;
>  import org.apache.cassandra.streaming.StreamPlan;
> @@ -57,6 +66,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
>   */
>  public class LegacySSTableTest
>  {
> +    private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
> +
>      public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
>      public static final String KSNAME = "Keyspace1";
>      public static final String CFNAME = "Standard1";
> @@ -64,6 +75,8 @@ public class LegacySSTableTest
>      public static Set<String> TEST_DATA;
>      public static File LEGACY_SSTABLE_ROOT;
>  
> +    public static final String[] legacyVersions = {"jb", "ka", "la"};
> +
>      @BeforeClass
>      public static void defineSchema() throws ConfigurationException
>      {
> @@ -208,4 +221,65 @@ public class LegacySSTableTest
>              throw e;
>          }
>      }
> +
> +    @Test
> +    public void testLegacyCqlTables() throws Exception
> +    {
> +        QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
> +
> +        loadLegacyTables();
> +    }
> +
> +    private void loadLegacyTables() throws IOException
> +    {
> +        for (String legacyVersion : legacyVersions)
> +        {
> +            logger.info("Preparing legacy version {}", legacyVersion);
> +
> +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
> +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
> +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
> +            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
> +
> +            loadLegacyTable("legacy_%s_simple", legacyVersion);
> +            loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
> +            loadLegacyTable("legacy_%s_clust", legacyVersion);
> +            loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
> +
> +        }
> +    }
> +
> +    private void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
> +    {
> +        String table = String.format(tablePattern, legacyVersion);
> +
> +        logger.info("Loading legacy table {}", table);
> +
> +        ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table);
> +
> +        for (File cfDir : cfs.getDirectories().getCFDirectories())
> +        {
> +            copySstables(legacyVersion, table, cfDir);
> +        }
> +
> +        cfs.loadNewSSTables();
> +    }
> +
> +    private static void copySstables(String legacyVersion, String table, File cfDir) throws IOException
> +    {
> +        byte[] buf = new byte[65536];
> +
> +        for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
> +        {
> +            if (file.isFile())
> +            {
> +                File target = new File(cfDir, file.getName());
> +                int rd;
> +                FileInputStream is = new FileInputStream(file);
> +                FileOutputStream os = new FileOutputStream(target);
> +                while ((rd = is.read(buf)) >= 0)
> +                    os.write(buf, 0, rd);
> +            }
> +        }
> +    }
>  }
> {noformat}
> {noformat:title=broken-workaround}
> diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
> index 554c782..e953b1d 100644
> --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
> +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
> @@ -96,7 +96,7 @@ public class Verifier implements Closeable
>          {
>              validator = null;
>  
> -            if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
> +            if (new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists())
>              {
>                  validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
>                  validator.validate();
> diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
> index 54dd35b..d0405e4 100644
> --- a/src/java/org/apache/cassandra/io/sstable/Component.java
> +++ b/src/java/org/apache/cassandra/io/sstable/Component.java
> @@ -34,6 +34,7 @@ public class Component
>      public static final char separator = '-';
>  
>      final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
> +
>      public enum Type
>      {
>          // the base data for an sstable: the remaining components can be regenerated
> @@ -79,13 +80,17 @@ public class Component
>          }
>      }
>  
> +    private static final String DIGEST_CRC32_NAME = "Digest.crc32";
> +    private static final String DIGEST_ADLER32_NAME = "Digest.adler32";
> +
>      // singleton components for types that don't need ids
>      public final static Component DATA = new Component(Type.DATA);
>      public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
>      public final static Component FILTER = new Component(Type.FILTER);
>      public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
>      public final static Component STATS = new Component(Type.STATS);
> -    public final static Component DIGEST = new Component(Type.DIGEST);
> +    public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, DIGEST_CRC32_NAME);
> +    public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, DIGEST_ADLER32_NAME);
>      public final static Component CRC = new Component(Type.CRC);
>      public final static Component SUMMARY = new Component(Type.SUMMARY);
>      public final static Component TOC = new Component(Type.TOC);
> @@ -138,11 +143,23 @@ public class Component
>              case FILTER:            component = Component.FILTER;                       break;
>              case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
>              case STATS:             component = Component.STATS;                        break;
> -            case DIGEST:            component = Component.DIGEST;                       break;
>              case CRC:               component = Component.CRC;                          break;
>              case SUMMARY:           component = Component.SUMMARY;                      break;
>              case TOC:               component = Component.TOC;                          break;
>              case CUSTOM:            component = new Component(Type.CUSTOM, path.right); break;
> +            case DIGEST:
> +                switch (path.right)
> +                {
> +                    case DIGEST_CRC32_NAME:
> +                        component = Component.DIGEST_CRC32;
> +                        break;
> +                    case DIGEST_ADLER32_NAME:
> +                        component = Component.DIGEST_ADLER32;
> +                        break;
> +                    default:
> +                        throw new IllegalStateException();
> +                }
> +                break;
>              default:
>                   throw new IllegalStateException();
>          }
> diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> index 38829df..0db6f00 100644
> --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
> @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
>  import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
>  import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
>  import org.apache.cassandra.utils.Pair;
> +import org.apache.hadoop.mapred.JobTracker;
>  
>  import static org.apache.cassandra.io.sstable.Component.separator;
>  
> @@ -344,4 +345,16 @@ public class Descriptor
>      {
>          return hashCode;
>      }
> +
> +    public Component digestComponent()
> +    {
> +        switch (version.compressedChecksumType())
> +        {
> +            case Adler32:
> +                return Component.DIGEST_ADLER32;
> +            case CRC32:
> +                return Component.DIGEST_CRC32;
> +        }
> +        throw new IllegalStateException();
> +    }
>  }
> diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> index bd21536..74e4b56 100644
> --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
> @@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
>                  Component.STATS,
>                  Component.SUMMARY,
>                  Component.TOC,
> -                Component.DIGEST));
> +                Component.DIGEST_CRC32));
>  
>          if (metadata.params.bloomFilterFpChance < 1.0)
>              components.add(Component.FILTER);
> diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> index 9a5eae8..a40c37a 100644
> --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
> @@ -122,7 +122,12 @@ public class MetadataSerializer implements IMetadataSerializer
>                  in.seek(offset);
>                  component = type.serializer.deserialize(descriptor.version, in);
>              }
> -            components.put(type, component);
> +            if (component == null)
> +            {
> +                assert type != MetadataType.HEADER || !descriptor.version.storeRows();
> +            }
> +            else
> +                components.put(type, component);
>          }
>          return components;
>      }
> diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> index 70cd860..b88f4f2 100644
> --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
> @@ -109,7 +109,7 @@ public class DataIntegrityMetadata
>          {
>              this.descriptor = descriptor;
>              checksum = descriptor.version.uncompressedChecksumType().newInstance();
> -            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
> +            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(descriptor.digestComponent())));
>              dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
>              try
>              {
> @@ -211,7 +211,7 @@ public class DataIntegrityMetadata
>  
>          public void writeFullChecksum(Descriptor descriptor)
>          {
> -            File outFile = new File(descriptor.filenameFor(Component.DIGEST));
> +            File outFile = new File(descriptor.filenameFor(descriptor.digestComponent()));
>              try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
>              {
>                  out.write(String.valueOf(fullChecksum.getValue()));
> diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
> index 920eee0..1420cae 100644
> --- a/src/java/org/apache/cassandra/io/util/FileUtils.java
> +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
> @@ -173,7 +173,7 @@ public class FileUtils
>  
>      public static void renameWithConfirm(File from, File to)
>      {
> -        assert from.exists();
> +        assert from.exists() : String.format("File to rename does not exist: %s", from.getPath());
>          if (logger.isDebugEnabled())
>              logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
>          // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
> diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
> index 13ce0c1..0233169 100644
> --- a/test/unit/org/apache/cassandra/db/VerifyTest.java
> +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
> @@ -275,11 +275,11 @@ public class VerifyTest
>          SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
>  
>  
> -        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw");
> +        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), "rw");
>          Long correctChecksum = Long.parseLong(file.readLine());
>          file.close();
>  
> -        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST));
> +        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
>  
>          try (Verifier verifier = new Verifier(cfs, sstable, false))
>          {
> @@ -315,7 +315,7 @@ public class VerifyTest
>          file.close();
>  
>          // Update the Digest to have the right Checksum
> -        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST));
> +        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
>  
>          try (Verifier verifier = new Verifier(cfs, sstable, false))
>          {
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)