You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/23 22:16:35 UTC

git commit: support custom sstable components patch by Piotr Kołaczkowski; reviewed by jbellis for CASSANDRA-4049

Updated Branches:
  refs/heads/trunk b8de48851 -> 5eb9e1c15


support custom sstable components
patch by Piotr Kołaczkowski; reviewed by jbellis for CASSANDRA-4049


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5eb9e1c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5eb9e1c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5eb9e1c1

Branch: refs/heads/trunk
Commit: 5eb9e1c1576edb90f5b6e2ef975686e87a8c93af
Parents: b8de488
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 23 15:16:03 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 23 15:16:03 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/io/sstable/Component.java |   59 +++++----
 .../org/apache/cassandra/io/sstable/SSTable.java   |  111 ++++++++++++---
 .../apache/cassandra/io/sstable/SSTableWriter.java |   12 +-
 3 files changed, 134 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 7a001ab..cbc12d9 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -34,7 +34,7 @@ public class Component
     public static final char separator = '-';
 
     final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
-    enum Type
+    public enum Type
     {
         // the base data for an sstable: the remaining components can be regenerated
         // based on the data component
@@ -54,7 +54,11 @@ public class Component
         // holds sha1 sum of the data file (to be checked by sha1sum)
         DIGEST("Digest.sha1"),
         // holds SSTable Index Summary and Boundaries
-        SUMMARY("Summary.db");
+        SUMMARY("Summary.db"),
+        // table of contents, stores the list of all components for the sstable
+        TOC("TOC.txt"),
+        // custom component, used by e.g. custom compaction strategy
+        CUSTOM(null);
 
         final String repr;
         Type(String repr)
@@ -67,34 +71,37 @@ public class Component
             for (Type type : TYPES)
                 if (repr.equals(type.repr))
                     return type;
-            throw new RuntimeException("Invalid SSTable component: '" + repr + "'");
+            return CUSTOM;
         }
     }
 
     // singleton components for types that don't need ids
-    public final static Component DATA = new Component(Type.DATA, -1);
-    public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX, -1);
-    public final static Component FILTER = new Component(Type.FILTER, -1);
-    public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER, -1);
-    public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO, -1);
-    public final static Component STATS = new Component(Type.STATS, -1);
-    public final static Component DIGEST = new Component(Type.DIGEST, -1);
-    public final static Component SUMMARY = new Component(Type.SUMMARY, -1);
+    public final static Component DATA = new Component(Type.DATA);
+    public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
+    public final static Component FILTER = new Component(Type.FILTER);
+    public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER);
+    public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
+    public final static Component STATS = new Component(Type.STATS);
+    public final static Component DIGEST = new Component(Type.DIGEST);
+    public final static Component SUMMARY = new Component(Type.SUMMARY);
+    public final static Component TOC = new Component(Type.TOC);
 
     public final Type type;
-    public final int id;
+    public final String name;
     public final int hashCode;
 
     public Component(Type type)
     {
-        this(type, -1);
+        this(type, type.repr);
+        assert type != Type.CUSTOM;
     }
 
-    public Component(Type type, int id)
+    public Component(Type type, String name)
     {
+        assert name != null : "Component name cannot be null";
         this.type = type;
-        this.id = id;
-        this.hashCode = Objects.hashCode(type, id);
+        this.name = name;
+        this.hashCode = Objects.hashCode(type, name);
     }
 
     /**
@@ -102,7 +109,7 @@ public class Component
      */
     public String name()
     {
-        return type.repr;
+        return name;
     }
 
     /**
@@ -120,14 +127,16 @@ public class Component
         Component component;
         switch(type)
         {
-            case DATA:              component = Component.DATA;             break;
-            case PRIMARY_INDEX:     component = Component.PRIMARY_INDEX;    break;
-            case FILTER:            component = Component.FILTER;           break;
-            case COMPACTED_MARKER:  component = Component.COMPACTED_MARKER; break;
-            case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO; break;
-            case STATS:             component = Component.STATS;            break;
-            case DIGEST:            component = Component.DIGEST;           break;
+            case DATA:              component = Component.DATA;                         break;
+            case PRIMARY_INDEX:     component = Component.PRIMARY_INDEX;                break;
+            case FILTER:            component = Component.FILTER;                       break;
+            case COMPACTED_MARKER:  component = Component.COMPACTED_MARKER;             break;
+            case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO;             break;
+            case STATS:             component = Component.STATS;                        break;
+            case DIGEST:            component = Component.DIGEST;                       break;
             case SUMMARY:           component = Component.SUMMARY;          break;
+            case TOC:               component = Component.TOC;                          break;
+            case CUSTOM:            component = new Component(Type.CUSTOM, path.right); break;
             default:
                  throw new IllegalStateException();
         }
@@ -149,7 +158,7 @@ public class Component
         if (!(o instanceof Component))
             return false;
         Component that = (Component)o;
-        return this.type == that.type && this.id == that.id;
+        return this.type == that.type && this.name.equals(that.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 8b4bafb..00bc2d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -17,20 +17,24 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
 
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.DecoratedKey;
+import com.google.common.io.Files;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -103,7 +107,7 @@ public abstract class SSTable
             assert component.type != Component.Type.COMPACTED_MARKER;
 
         this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
-        this.components = Collections.unmodifiableSet(dataComponents);
+        this.components = new CopyOnWriteArraySet<Component>(dataComponents);
         this.metadata = metadata;
         this.partitioner = partitioner;
     }
@@ -183,36 +187,49 @@ public abstract class SSTable
     }
 
     /**
-     * @return A Descriptor,Component pair, or null if not a valid sstable component.
+     * @return A Descriptor,Component pair. If component is of unknown type, returns CUSTOM component.
      */
     public static Pair<Descriptor,Component> tryComponentFromFilename(File dir, String name)
     {
+        return Component.fromFilename(dir, name);
+    }
+
+    /**
+     * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path.
+     */
+    static Set<Component> componentsFor(final Descriptor desc)
+    {
         try
         {
-            return Component.fromFilename(dir, name);
+            try
+            {
+                return readTOC(desc);
+            }
+            catch (FileNotFoundException e)
+            {
+                Set<Component> components = discoverComponentsFor(desc);
+                if (!components.contains(Component.TOC))
+                    components.add(Component.TOC);
+                appendTOC(desc, components);
+                return components;
+            }
         }
-        catch (Exception e)
+        catch (IOException e)
         {
-            if (!"snapshots".equals(name) && !"backups".equals(name)
-                    && !name.contains(".json"))
-                logger.warn("Invalid file '{}' in data directory {}.", name, dir);
-            return null;
+            throw new IOError(e);
         }
     }
 
-    /**
-     * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path.
-     */
-    static Set<Component> componentsFor(final Descriptor desc)
+    private static Set<Component> discoverComponentsFor(Descriptor desc)
     {
-        Set<Component> components = Sets.newHashSetWithExpectedSize(Component.TYPES.size());
-        for (Component.Type componentType : Component.TYPES)
+        Set<Component.Type> knownTypes = Sets.difference(Component.TYPES, Collections.singleton(Component.Type.CUSTOM));
+        Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size());
+        for (Component.Type componentType : knownTypes)
         {
             Component component = new Component(componentType);
             if (new File(desc.filenameFor(component)).exists())
                 components.add(component);
         }
-
         return components;
     }
 
@@ -261,4 +278,60 @@ public abstract class SSTable
                "path='" + getFilename() + '\'' +
                ')';
     }
+
+    /**
+     * Reads the list of components from the TOC component.
+     * @return set of components found in the TOC
+     */
+    protected static Set<Component> readTOC(Descriptor descriptor) throws IOException
+    {
+        File tocFile = new File(descriptor.filenameFor(Component.TOC));
+        List<String> componentNames = Files.readLines(tocFile, Charset.defaultCharset());
+        Set<Component> components = Sets.newHashSetWithExpectedSize(componentNames.size());
+        for (String componentName : componentNames)
+        {
+            Component component = new Component(Component.Type.fromRepresentation(componentName), componentName);
+            if (!new File(descriptor.filenameFor(component)).exists())
+                logger.error("Missing component: " + descriptor.filenameFor(component));
+            else
+                components.add(component);
+        }
+        return components;
+    }
+
+    /**
+     * Appends new component names to the TOC component.
+     */
+    protected static void appendTOC(Descriptor descriptor, Collection<Component> components)
+    {
+        File tocFile = new File(descriptor.filenameFor(Component.TOC));
+        PrintWriter w = null;
+        try
+        {
+            w = new PrintWriter(new FileWriter(tocFile, true));
+            for (Component component : components)
+                w.println(component.name);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, tocFile);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(w);
+        }
+    }
+
+    /**
+     * Registers new custom components. Used by custom compaction strategies.
+     * Adding a component for the second time is a no-op.
+     * Don't remove this - this method is a part of the public API, intended for use by custom compaction strategies.
+     * @param newComponents collection of components to be added
+     */
+    public synchronized void addComponents(Collection<Component> newComponents)
+    {
+        Collection<Component> componentsToAdd = Collections2.filter(newComponents, Predicates.not(Predicates.in(components)));
+        appendTOC(descriptor, componentsToAdd);
+        components.addAll(componentsToAdd);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5eb9e1c1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 06e6826..c17de4c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -62,10 +62,11 @@ public class SSTableWriter extends SSTable
     private static Set<Component> components(CFMetaData metadata)
     {
         Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
-                                                                         Component.FILTER,
-                                                                         Component.PRIMARY_INDEX,
-                                                                         Component.STATS,
-                                                                         Component.SUMMARY));
+                                                                 Component.FILTER,
+                                                                 Component.PRIMARY_INDEX,
+                                                                 Component.STATS,
+                                                                 Component.SUMMARY,
+                                                                 Component.TOC));
 
         if (metadata.compressionParameters().sstableCompressor != null)
             components.add(Component.COMPRESSION_INFO);
@@ -324,6 +325,9 @@ public class SSTableWriter extends SSTable
         writeMetadata(descriptor, sstableMetadata);
         maybeWriteDigest();
 
+        // save the table of components
+        SSTable.appendTOC(descriptor, components);
+
         // remove the 'tmp' marker from all components
         final Descriptor newdesc = rename(descriptor, components);