You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Robert Stupp (JIRA)" <ji...@apache.org> on 2015/09/01 17:16:45 UTC

[jira] [Created] (CASSANDRA-10237) CFS.loadNewSSTables() broken for pre-3.0 sstables

Robert Stupp created CASSANDRA-10237:
----------------------------------------

             Summary: CFS.loadNewSSTables() broken for pre-3.0 sstables
                 Key: CASSANDRA-10237
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-10237
             Project: Cassandra
          Issue Type: Bug
            Reporter: Robert Stupp


While working on CASSANDRA-10236 I discovered that {{CFS.loadNewSSTables()}} doesn't work for pre-3.0 sstables - just for version {{ma}} sstables.

TBC: Starting C* with 2.0, 2.1 or 2.2 sstables works - but loading new sstables during runtime doesn't.

Issues with {{CFS.loadNewSSTables()}} discovered so far:
# {{MetadataSerializer.deserialize(Descriptor,FileDataInput,EnumSet)}} returns {{null}} for {{MetadataType.HEADER}} which results in a NPE later in {{MetadataSerializer.serialize}} executing {{Collections.sort}}.
# After working around the previous issue, it turns out that it couldn't load the digest file, since {{Component.DIGEST}} is a singleton which refers to CRC32, but pre-3.0 sstables use Adler32.
# After working around that one, it fails in {{StreamingHistogram$StreamingHistogramSerializer.deserialize}} as {{maxBinSize==Integer.MAX_VALUE}}.

As loading legacy sstables works fine during startup, I suspect my workarounds are not correct.

For reference, [this commit|https://github.com/snazy/cassandra/commit/2f0668a1e1d8a101e8301b9c4211b164c113afaa] contains a ton of legacy sstables (simple, counter, clustered and clustered+counter) for 2.0, 2.1 and 2.2. I've extended LegacySSTablesTest to read these tables using {{CFS.loadNewSSTables()}}.

{noformat:title=LegacySSTablesTest.txt}
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index d2922cc..1be6450 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -27,10 +30,15 @@ import java.util.Set;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
@@ -43,6 +51,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamPlan;
@@ -57,6 +66,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  */
 public class LegacySSTableTest
 {
+    private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
+
     public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
     public static final String KSNAME = "Keyspace1";
     public static final String CFNAME = "Standard1";
@@ -64,6 +75,8 @@ public class LegacySSTableTest
     public static Set<String> TEST_DATA;
     public static File LEGACY_SSTABLE_ROOT;
 
+    public static final String[] legacyVersions = {"jb", "ka", "la"};
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -208,4 +221,65 @@ public class LegacySSTableTest
             throw e;
         }
     }
+
+    @Test
+    public void testLegacyCqlTables() throws Exception
+    {
+        QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+
+        loadLegacyTables();
+    }
+
+    private void loadLegacyTables() throws IOException
+    {
+        for (String legacyVersion : legacyVersions)
+        {
+            logger.info("Preparing legacy version {}", legacyVersion);
+
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
+
+            loadLegacyTable("legacy_%s_simple", legacyVersion);
+            loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
+            loadLegacyTable("legacy_%s_clust", legacyVersion);
+            loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
+
+        }
+    }
+
+    private void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
+    {
+        String table = String.format(tablePattern, legacyVersion);
+
+        logger.info("Loading legacy table {}", table);
+
+        ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(table);
+
+        for (File cfDir : cfs.getDirectories().getCFDirectories())
+        {
+            copySstables(legacyVersion, table, cfDir);
+        }
+
+        cfs.loadNewSSTables();
+    }
+
+    private static void copySstables(String legacyVersion, String table, File cfDir) throws IOException
+    {
+        byte[] buf = new byte[65536];
+
+        for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
+        {
+            if (file.isFile())
+            {
+                File target = new File(cfDir, file.getName());
+                int rd;
+                FileInputStream is = new FileInputStream(file);
+                FileOutputStream os = new FileOutputStream(target);
+                while ((rd = is.read(buf)) >= 0)
+                    os.write(buf, 0, rd);
+            }
+        }
+    }
 }
{noformat}

{noformat:title=broken-workaround}
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 554c782..e953b1d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -96,7 +96,7 @@ public class Verifier implements Closeable
         {
             validator = null;
 
-            if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
+            if (new File(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent())).exists())
             {
                 validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
                 validator.validate();
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 54dd35b..d0405e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -34,6 +34,7 @@ public class Component
     public static final char separator = '-';
 
     final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
+
     public enum Type
     {
         // the base data for an sstable: the remaining components can be regenerated
@@ -79,13 +80,17 @@ public class Component
         }
     }
 
+    private static final String DIGEST_CRC32_NAME = "Digest.crc32";
+    private static final String DIGEST_ADLER32_NAME = "Digest.adler32";
+
     // singleton components for types that don't need ids
     public final static Component DATA = new Component(Type.DATA);
     public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
     public final static Component FILTER = new Component(Type.FILTER);
     public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
     public final static Component STATS = new Component(Type.STATS);
-    public final static Component DIGEST = new Component(Type.DIGEST);
+    public final static Component DIGEST_CRC32 = new Component(Type.DIGEST, DIGEST_CRC32_NAME);
+    public final static Component DIGEST_ADLER32 = new Component(Type.DIGEST, DIGEST_ADLER32_NAME);
     public final static Component CRC = new Component(Type.CRC);
     public final static Component SUMMARY = new Component(Type.SUMMARY);
     public final static Component TOC = new Component(Type.TOC);
@@ -138,11 +143,23 @@ public class Component
             case FILTER:            component = Component.FILTER;                       break;
             case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
             case STATS:             component = Component.STATS;                        break;
-            case DIGEST:            component = Component.DIGEST;                       break;
             case CRC:               component = Component.CRC;                          break;
             case SUMMARY:           component = Component.SUMMARY;                      break;
             case TOC:               component = Component.TOC;                          break;
             case CUSTOM:            component = new Component(Type.CUSTOM, path.right); break;
+            case DIGEST:
+                switch (path.right)
+                {
+                    case DIGEST_CRC32_NAME:
+                        component = Component.DIGEST_CRC32;
+                        break;
+                    case DIGEST_ADLER32_NAME:
+                        component = Component.DIGEST_ADLER32;
+                        break;
+                    default:
+                        throw new IllegalStateException();
+                }
+                break;
             default:
                  throw new IllegalStateException();
         }
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 38829df..0db6f00 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.mapred.JobTracker;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
 
@@ -344,4 +345,16 @@ public class Descriptor
     {
         return hashCode;
     }
+
+    public Component digestComponent()
+    {
+        switch (version.compressedChecksumType())
+        {
+            case Adler32:
+                return Component.DIGEST_ADLER32;
+            case CRC32:
+                return Component.DIGEST_CRC32;
+        }
+        throw new IllegalStateException();
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index bd21536..74e4b56 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -131,7 +131,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                 Component.STATS,
                 Component.SUMMARY,
                 Component.TOC,
-                Component.DIGEST));
+                Component.DIGEST_CRC32));
 
         if (metadata.params.bloomFilterFpChance < 1.0)
             components.add(Component.FILTER);
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 9a5eae8..a40c37a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -122,7 +122,12 @@ public class MetadataSerializer implements IMetadataSerializer
                 in.seek(offset);
                 component = type.serializer.deserialize(descriptor.version, in);
             }
-            components.put(type, component);
+            if (component == null)
+            {
+                assert type != MetadataType.HEADER || !descriptor.version.storeRows();
+            }
+            else
+                components.put(type, component);
         }
         return components;
     }
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 70cd860..b88f4f2 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -109,7 +109,7 @@ public class DataIntegrityMetadata
         {
             this.descriptor = descriptor;
             checksum = descriptor.version.uncompressedChecksumType().newInstance();
-            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
+            digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(descriptor.digestComponent())));
             dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
             try
             {
@@ -211,7 +211,7 @@ public class DataIntegrityMetadata
 
         public void writeFullChecksum(Descriptor descriptor)
         {
-            File outFile = new File(descriptor.filenameFor(Component.DIGEST));
+            File outFile = new File(descriptor.filenameFor(descriptor.digestComponent()));
             try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8))
             {
                 out.write(String.valueOf(fullChecksum.getValue()));
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 920eee0..1420cae 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -173,7 +173,7 @@ public class FileUtils
 
     public static void renameWithConfirm(File from, File to)
     {
-        assert from.exists();
+        assert from.exists() : String.format("File to rename does not exist: %s", from.getPath());
         if (logger.isDebugEnabled())
             logger.debug((String.format("Renaming %s to %s", from.getPath(), to.getPath())));
         // this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 13ce0c1..0233169 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -275,11 +275,11 @@ public class VerifyTest
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
 
-        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw");
+        RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()), "rw");
         Long correctChecksum = Long.parseLong(file.readLine());
         file.close();
 
-        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST));
+        writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
 
         try (Verifier verifier = new Verifier(cfs, sstable, false))
         {
@@ -315,7 +315,7 @@ public class VerifyTest
         file.close();
 
         // Update the Digest to have the right Checksum
-        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST));
+        writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent()));
 
         try (Verifier verifier = new Verifier(cfs, sstable, false))
         {
{noformat}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)