You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/06 11:33:34 UTC

svn commit: r1165577 - in /cassandra/trunk: ./ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/io/compress/ src/java/org/...

Author: slebresne
Date: Tue Sep  6 09:33:33 2011
New Revision: 1165577

URL: http://svn.apache.org/viewvc?rev=1165577&view=rev
Log:
Use only one parameters for compression options
patch by slebresne; reviewed by jbellis for CASSANDRA-3128

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
    cassandra/trunk/src/avro/internode.genavro
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
    cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
    cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep  6 09:33:33 2011
@@ -25,7 +25,7 @@
    (CASSANDRA-2953)
  * add paging to get_count (CASSANDRA-2894)
  * fix "short reads" in [multi]get (CASSANDRA-2643)
- * add optional compression for sstables (CASSANDRA-47)
+ * add optional compression for sstables (CASSANDRA-47, 3001, 3128)
  * add scheduler JMX metrics (CASSANDRA-2962)
  * add block level checksum for compressed data (CASSANDRA-1717)
  * make column family backed column map pluggable and introduce unsynchronized
@@ -53,7 +53,6 @@
  * remove compaction_thread_priority setting (CASSANDRA-3104)
  * generate hints for replicas that timeout, not just replicas that are known
    to be down before starting (CASSANDRA-2034)
- * Make the compression algorithm and chunk length configurable (CASSANDRA-3001)
  * Add throttling for internode streaming (CASSANDRA-3080)
  * make the repair of a range repair all replica (CASSANDRA-2610)
  * expose the ability to repair the first range (as returned by the

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Sep  6 09:33:33 2011
@@ -11,14 +11,13 @@ Upgrading
 
 Features
 --------
-    - "cassandra.bat install" will install Cassandra as a Windows Service
-    - SSTable compression can be enabled by setting "compressionParameters"
-      attribute on CFMetaData (set "compressionParameters" to null to disable compression).
-      SnappyCompressor and DeflateCompressor are currently
-      available as "compression class" and "chunk_length_kb" as compression
-      option. Stress tool supports -I option to provide compressor and
-      CLI `{create/update} column family` commands support "compression"
-      and "compression_options" parameters.
+    - SSTable compression is supported through the 'compression_options'
+      parameter when creating/updating a column family. For instance, you can
+      create a column family Cf using compression (using the Snappy library)
+      in the CLI with:
+        create column family Cf with compression_options={sstable_compression: SnappyCompressor}
+      SSTable compression is not activated by default but can be activated or
+      deactivated at any time.
     - Compressed SSTable blocks are checksummed to protect against bitrot
     - New LevelDB-inspired compaction algorithm can be enabled by setting the 
       Columnfamily compaction_strategy=LeveledCompactionStrategy option.  
@@ -34,6 +33,10 @@ Features
     - A dead node may be replaced in a single step by starting a new node
       with -Dcassandra.replace_token=<token>.  More details can be found at
       http://wiki.apache.org/cassandra/Operations#Replacing_a_Dead_Node
+    - It is now possible to repair only the first range returned by the
+      partitioner for a node with `nodetool repair -pr`. It makes it
+      easier/possible to repair a full cluster without any work duplication by
+      running this command on every node of the cluster.
 
 Other
 -----
@@ -41,10 +44,6 @@ Other
       when HH is enabled, repair only needs to be run if a node crashes.
     - Because of this, read repair is disabled now by default on newly
       created ColumnFamilies.
-    - It is now possible to repair only the first range returned by the
-      partitioner for a node with `nodetool repair -pr`. It makes it
-      easier/possible to repair a full cluster without any work duplication by
-      running this command on every node of the cluster.
 
 
 0.8.5

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Sep  6 09:33:33 2011
@@ -399,8 +399,7 @@ struct CfDef {
     29: optional string compaction_strategy,
     30: optional map<string,string> compaction_strategy_options,
     31: optional i32 row_cache_keys_to_save,
-    32: optional string compression,
-    33: optional map<string,string> compression_options,
+    32: optional map<string,string> compression_options,
 }
 
 /* describes a keyspace. */

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Tue Sep  6 09:33:33 2011
@@ -9086,8 +9086,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -17043,8 +17041,6 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
-        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -25754,6 +25750,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Sep  6 09:33:33 2011
@@ -72,8 +72,7 @@ public class CfDef implements org.apache
   private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy", org.apache.thrift.protocol.TType.STRING, (short)29);
   private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy_options", org.apache.thrift.protocol.TType.MAP, (short)30);
   private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31);
-  private static final org.apache.thrift.protocol.TField COMPRESSION_FIELD_DESC = new org.apache.thrift.protocol.TField("compression", org.apache.thrift.protocol.TType.STRING, (short)32);
-  private static final org.apache.thrift.protocol.TField COMPRESSION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compression_options", org.apache.thrift.protocol.TType.MAP, (short)33);
+  private static final org.apache.thrift.protocol.TField COMPRESSION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compression_options", org.apache.thrift.protocol.TType.MAP, (short)32);
 
   public String keyspace;
   public String name;
@@ -102,7 +101,6 @@ public class CfDef implements org.apache
   public String compaction_strategy;
   public Map<String,String> compaction_strategy_options;
   public int row_cache_keys_to_save;
-  public String compression;
   public Map<String,String> compression_options;
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@@ -134,8 +132,7 @@ public class CfDef implements org.apache
     COMPACTION_STRATEGY((short)29, "compaction_strategy"),
     COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"),
     ROW_CACHE_KEYS_TO_SAVE((short)31, "row_cache_keys_to_save"),
-    COMPRESSION((short)32, "compression"),
-    COMPRESSION_OPTIONS((short)33, "compression_options");
+    COMPRESSION_OPTIONS((short)32, "compression_options");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -204,9 +201,7 @@ public class CfDef implements org.apache
           return COMPACTION_STRATEGY_OPTIONS;
         case 31: // ROW_CACHE_KEYS_TO_SAVE
           return ROW_CACHE_KEYS_TO_SAVE;
-        case 32: // COMPRESSION
-          return COMPRESSION;
-        case 33: // COMPRESSION_OPTIONS
+        case 32: // COMPRESSION_OPTIONS
           return COMPRESSION_OPTIONS;
         default:
           return null;
@@ -324,8 +319,6 @@ public class CfDef implements org.apache
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
     tmpMap.put(_Fields.ROW_CACHE_KEYS_TO_SAVE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_keys_to_save", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
-    tmpMap.put(_Fields.COMPRESSION, new org.apache.thrift.meta_data.FieldMetaData("compression", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.COMPRESSION_OPTIONS, new org.apache.thrift.meta_data.FieldMetaData("compression_options", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
@@ -434,9 +427,6 @@ public class CfDef implements org.apache
       this.compaction_strategy_options = __this__compaction_strategy_options;
     }
     this.row_cache_keys_to_save = other.row_cache_keys_to_save;
-    if (other.isSetCompression()) {
-      this.compression = other.compression;
-    }
     if (other.isSetCompression_options()) {
       Map<String,String> __this__compression_options = new HashMap<String,String>();
       for (Map.Entry<String, String> other_element : other.compression_options.entrySet()) {
@@ -504,7 +494,6 @@ public class CfDef implements org.apache
     this.compaction_strategy_options = null;
     setRow_cache_keys_to_saveIsSet(false);
     this.row_cache_keys_to_save = 0;
-    this.compression = null;
     this.compression_options = null;
   }
 
@@ -1178,30 +1167,6 @@ public class CfDef implements org.apache
     __isset_bit_vector.set(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID, value);
   }
 
-  public String getCompression() {
-    return this.compression;
-  }
-
-  public CfDef setCompression(String compression) {
-    this.compression = compression;
-    return this;
-  }
-
-  public void unsetCompression() {
-    this.compression = null;
-  }
-
-  /** Returns true if field compression is set (has been assigned a value) and false otherwise */
-  public boolean isSetCompression() {
-    return this.compression != null;
-  }
-
-  public void setCompressionIsSet(boolean value) {
-    if (!value) {
-      this.compression = null;
-    }
-  }
-
   public int getCompression_optionsSize() {
     return (this.compression_options == null) ? 0 : this.compression_options.size();
   }
@@ -1455,14 +1420,6 @@ public class CfDef implements org.apache
       }
       break;
 
-    case COMPRESSION:
-      if (value == null) {
-        unsetCompression();
-      } else {
-        setCompression((String)value);
-      }
-      break;
-
     case COMPRESSION_OPTIONS:
       if (value == null) {
         unsetCompression_options();
@@ -1557,9 +1514,6 @@ public class CfDef implements org.apache
     case ROW_CACHE_KEYS_TO_SAVE:
       return new Integer(getRow_cache_keys_to_save());
 
-    case COMPRESSION:
-      return getCompression();
-
     case COMPRESSION_OPTIONS:
       return getCompression_options();
 
@@ -1628,8 +1582,6 @@ public class CfDef implements org.apache
       return isSetCompaction_strategy_options();
     case ROW_CACHE_KEYS_TO_SAVE:
       return isSetRow_cache_keys_to_save();
-    case COMPRESSION:
-      return isSetCompression();
     case COMPRESSION_OPTIONS:
       return isSetCompression_options();
     }
@@ -1892,15 +1844,6 @@ public class CfDef implements org.apache
         return false;
     }
 
-    boolean this_present_compression = true && this.isSetCompression();
-    boolean that_present_compression = true && that.isSetCompression();
-    if (this_present_compression || that_present_compression) {
-      if (!(this_present_compression && that_present_compression))
-        return false;
-      if (!this.compression.equals(that.compression))
-        return false;
-    }
-
     boolean this_present_compression_options = true && this.isSetCompression_options();
     boolean that_present_compression_options = true && that.isSetCompression_options();
     if (this_present_compression_options || that_present_compression_options) {
@@ -2052,11 +1995,6 @@ public class CfDef implements org.apache
     if (present_row_cache_keys_to_save)
       builder.append(row_cache_keys_to_save);
 
-    boolean present_compression = true && (isSetCompression());
-    builder.append(present_compression);
-    if (present_compression)
-      builder.append(compression);
-
     boolean present_compression_options = true && (isSetCompression_options());
     builder.append(present_compression_options);
     if (present_compression_options)
@@ -2343,16 +2281,6 @@ public class CfDef implements org.apache
         return lastComparison;
       }
     }
-    lastComparison = Boolean.valueOf(isSetCompression()).compareTo(typedOther.isSetCompression());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetCompression()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compression, typedOther.compression);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
     lastComparison = Boolean.valueOf(isSetCompression_options()).compareTo(typedOther.isSetCompression_options());
     if (lastComparison != 0) {
       return lastComparison;
@@ -2606,14 +2534,7 @@ public class CfDef implements org.apache
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 32: // COMPRESSION
-          if (field.type == org.apache.thrift.protocol.TType.STRING) {
-            this.compression = iprot.readString();
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 33: // COMPRESSION_OPTIONS
+        case 32: // COMPRESSION_OPTIONS
           if (field.type == org.apache.thrift.protocol.TType.MAP) {
             {
               org.apache.thrift.protocol.TMap _map41 = iprot.readMapBegin();
@@ -2819,13 +2740,6 @@ public class CfDef implements org.apache
       oprot.writeI32(this.row_cache_keys_to_save);
       oprot.writeFieldEnd();
     }
-    if (this.compression != null) {
-      if (isSetCompression()) {
-        oprot.writeFieldBegin(COMPRESSION_FIELD_DESC);
-        oprot.writeString(this.compression);
-        oprot.writeFieldEnd();
-      }
-    }
     if (this.compression_options != null) {
       if (isSetCompression_options()) {
         oprot.writeFieldBegin(COMPRESSION_OPTIONS_FIELD_DESC);
@@ -3059,16 +2973,6 @@ public class CfDef implements org.apache
       sb.append(this.row_cache_keys_to_save);
       first = false;
     }
-    if (isSetCompression()) {
-      if (!first) sb.append(", ");
-      sb.append("compression:");
-      if (this.compression == null) {
-        sb.append("null");
-      } else {
-        sb.append(this.compression);
-      }
-      first = false;
-    }
     if (isSetCompression_options()) {
       if (!first) sb.append(", ");
       sb.append("compression_options:");

Modified: cassandra/trunk/src/avro/internode.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/avro/internode.genavro (original)
+++ cassandra/trunk/src/avro/internode.genavro Tue Sep  6 09:33:33 2011
@@ -72,7 +72,6 @@ protocol InterNode {
         union { null, bytes } key_alias = null;
         union { null, string } compaction_strategy = null;
         union { null, map<string> } compaction_strategy_options = null;
-        union { null, string } compression = null;
         union { null, map<string> } compression_options = null;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Sep  6 09:33:33 2011
@@ -137,7 +137,6 @@ public class CliClient
         KEY_VALIDATION_CLASS,
         COMPACTION_STRATEGY,
         COMPACTION_STRATEGY_OPTIONS,
-        COMPRESSION,
         COMPRESSION_OPTIONS,
     }
 
@@ -1255,9 +1254,6 @@ public class CliClient
             case COMPACTION_STRATEGY_OPTIONS:
                 cfDef.setCompaction_strategy_options(getStrategyOptionsFromTree(statement.getChild(i+1)));
                 break;
-            case COMPRESSION:
-                cfDef.setCompression(mValue.toLowerCase().equals("null") ? null : CliUtils.unescapeSQLString(mValue));
-                break;
             case COMPRESSION_OPTIONS:
                 cfDef.setCompression_options(getStrategyOptionsFromTree(statement.getChild(i+1)));
                 break;
@@ -1936,7 +1932,6 @@ public class CliClient
         sessionState.out.printf("      Compaction min/max thresholds: %s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
         sessionState.out.printf("      Read repair chance: %s%n", cf_def.read_repair_chance);
         sessionState.out.printf("      Replicate on write: %s%n", cf_def.replicate_on_write);
-        sessionState.out.printf("      Compression: %s%n", cf_def.compression);
 
         // if we have connection to the cfMBean established
         if (cfMBean != null)
@@ -1987,6 +1982,13 @@ public class CliClient
             for (Map.Entry<String, String> e : cf_def.compaction_strategy_options.entrySet())
                 sessionState.out.printf("        %s: %s%n", e.getKey(), e.getValue());
         }
+
+        if (cf_def.compression_options != null && !cf_def.compression_options.isEmpty())
+        {
+            sessionState.out.println("      Compression Options:");
+            for (Map.Entry<String, String> e : cf_def.compression_options.entrySet())
+                sessionState.out.printf("        %s: %s%n", e.getKey(), e.getValue());
+        }
     }
 
     // DESCRIBE KEYSPACE (<keyspace> | <column_family>)?

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Sep  6 09:33:33 2011
@@ -345,16 +345,7 @@ public final class CFMetaData
             for (Map.Entry<String, String> e : compactionStrategyOptions.entrySet())
                 cf.compaction_strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
         }
-        if (compressionParameters.compressorClass != null)
-        {
-            cf.compression = new Utf8(compressionParameters.compressorClass.getName());
-            if (compressionParameters.compressionOptions != null)
-            {
-                cf.compression_options = new HashMap<CharSequence, CharSequence>();
-                for (Map.Entry<String, String> e : compressionParameters.compressionOptions.entrySet())
-                    cf.compression_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
-            }
-        }
+        cf.compression_options = compressionParameters.asAvroOptions();
         return cf;
     }
 
@@ -437,7 +428,7 @@ public final class CFMetaData
         CompressionParameters cp;
         try
         {
-            cp = new CompressionParameters(cf.compression, cf.compression_options);
+            cp = CompressionParameters.create(cf.compression_options);
         }
         catch (ConfigurationException e)
         {
@@ -714,7 +705,7 @@ public final class CFMetaData
         if (cf_def.isSetCompaction_strategy_options())
             newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options));
 
-        CompressionParameters cp = new CompressionParameters(cf_def.compression, cf_def.compression_options);
+        CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
 
         return newCFMD.comment(cf_def.comment)
                       .rowCacheSize(cf_def.row_cache_size)
@@ -830,7 +821,7 @@ public final class CFMetaData
                 compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
         }
 
-        compressionParameters = new CompressionParameters(cf_def.compression, cf_def.compression_options);
+        compressionParameters = CompressionParameters.create(cf_def.compression_options);
 
         logger.debug("application result is {}", this);
     }
@@ -922,11 +913,7 @@ public final class CFMetaData
         def.setColumn_metadata(column_meta);
         def.setCompaction_strategy(compactionStrategyClass.getName());
         def.setCompaction_strategy_options(new HashMap<String, String>(compactionStrategyOptions));
-        if (compressionParameters.compressorClass != null)
-        {
-            def.setCompression(compressionParameters.compressorClass.getName());
-            def.setCompression_options(compressionParameters.compressionOptions);
-        }
+        def.setCompression_options(compressionParameters.asThriftOptions());
         return def;
     }
 
@@ -1051,8 +1038,7 @@ public final class CFMetaData
             .append("column_metadata", column_metadata)
             .append("compactionStrategyClass", compactionStrategyClass)
             .append("compactionStrategyOptions", compactionStrategyOptions)
-            .append("compressorClass", compressionParameters.compressorClass)
-            .append("compressionOptions", compressionParameters.compressionOptions)
+            .append("compressionOptions", compressionParameters.asThriftOptions())
             .toString();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java Tue Sep  6 09:33:33 2011
@@ -51,8 +51,8 @@ public class CompressedSequentialWriter 
 
     public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
     {
-        super(file, parameters.chunkLength, skipIOCache);
-        this.compressor = parameters.compressor;
+        super(file, parameters.chunkLength(), skipIOCache);
+        this.compressor = parameters.sstableCompressor;
 
         // buffer for compression should be the same size as buffer itself
         compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java Tue Sep  6 09:33:33 2011
@@ -54,7 +54,7 @@ public class CompressionMetadata
         int chunkLength = stream.readInt();
         try
         {
-            parameters = new CompressionParameters(compressorName, options, chunkLength);
+            parameters = new CompressionParameters(compressorName, chunkLength, options);
         }
         catch (ConfigurationException e)
         {
@@ -70,12 +70,12 @@ public class CompressionMetadata
 
     public ICompressor compressor()
     {
-        return parameters.compressor;
+        return parameters.sstableCompressor;
     }
 
     public int chunkLength()
     {
-        return parameters.chunkLength;
+        return parameters.chunkLength();
     }
 
     /**
@@ -120,7 +120,7 @@ public class CompressionMetadata
     public Chunk chunkFor(long position) throws IOException
     {
         // position of the chunk
-        int idx = (int) (position / parameters.chunkLength);
+        int idx = (int) (position / parameters.chunkLength());
 
         if (idx >= chunkOffsets.length)
             throw new EOFException();
@@ -146,16 +146,16 @@ public class CompressionMetadata
         public void writeHeader(CompressionParameters parameters) throws IOException
         {
             // algorithm
-            writeUTF(parameters.compressorClass.getSimpleName());
-            writeInt(parameters.compressionOptions.size());
-            for (Map.Entry<String, String> entry : parameters.compressionOptions.entrySet())
+            writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+            writeInt(parameters.otherOptions.size());
+            for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
             {
                 writeUTF(entry.getKey());
                 writeUTF(entry.getValue());
             }
 
             // store the length of the chunk
-            writeInt(parameters.chunkLength);
+            writeInt(parameters.chunkLength());
             // store position and reserve a place for uncompressed data length and chunks count
             dataLengthOffset = getFilePointer();
             writeLong(-1);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java Tue Sep  6 09:33:33 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
@@ -32,44 +33,54 @@ import org.apache.cassandra.config.Confi
 public class CompressionParameters
 {
     public final static int DEFAULT_CHUNK_LENGTH = 65536;
-    public static final String CHUNK_LENGTH_PARAMETER = "chunk_length_kb";
 
-    public final Class<? extends ICompressor> compressorClass;
-    public final Map<String, String> compressionOptions;
+    public static final String SSTABLE_COMPRESSION = "sstable_compression";
+    public static final String CHUNK_LENGTH = "chunk_length_kb";
 
-    public final transient ICompressor compressor;
-    public final transient int chunkLength;
+    public final ICompressor sstableCompressor;
+    private final Integer chunkLength;
+    public final Map<String, String> otherOptions; // Unrecognized options, can be use by the compressor
 
-    public CompressionParameters(CharSequence compressorClassName, Map<? extends CharSequence, ? extends CharSequence> options) throws ConfigurationException
+    public static CompressionParameters create(Map<? extends CharSequence, ? extends CharSequence> opts) throws ConfigurationException
     {
-        this(compressorClassName, copyOptions(options), -1);
+        Map<String, String> options = copyOptions(opts);
+        String sstableCompressionClass = options.get(SSTABLE_COMPRESSION);
+        String chunkLength = options.get(CHUNK_LENGTH);
+        options.remove(SSTABLE_COMPRESSION);
+        options.remove(CHUNK_LENGTH);
+        CompressionParameters cp = new CompressionParameters(sstableCompressionClass, parseChunkLength(chunkLength), options);
+        cp.validateChunkLength();
+        return cp;
     }
 
-    public CompressionParameters(CharSequence compressorClassName, Map<String, String> options, int chunkLength) throws ConfigurationException
+    public CompressionParameters(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
     {
-        this(createCompressor(parseCompressorClass(compressorClassName), options), options, chunkLength < 0 ? getChunkLength(options) : chunkLength);
-        validateChunkLength();
+        this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions);
     }
 
-    public CompressionParameters(ICompressor compressor)
+    public CompressionParameters(ICompressor sstableCompressor)
     {
-        this(compressor, null, DEFAULT_CHUNK_LENGTH);
+        this(sstableCompressor, null, Collections.<String, String>emptyMap());
     }
 
-    public CompressionParameters(ICompressor compressor, Map<String, String> compressionOptions, int chunkLength)
+    public CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions)
     {
-        this.compressorClass = compressor == null ? null : compressor.getClass();
-        this.compressionOptions = compressor == null ? null : (compressionOptions == null ? Collections.<String, String>emptyMap() : compressionOptions);
+        this.sstableCompressor = sstableCompressor;
         this.chunkLength = chunkLength;
-        this.compressor = compressor;
+        this.otherOptions = otherOptions;
     }
 
-    private static Class<? extends ICompressor> parseCompressorClass(CharSequence cc) throws ConfigurationException
+    public int chunkLength()
     {
-        if (cc == null)
+        return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
+    }
+
+
+    private static Class<? extends ICompressor> parseCompressorClass(String className) throws ConfigurationException
+    {
+        if (className == null)
             return null;
 
-        String className = cc.toString();
         className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className;
         try
         {
@@ -77,7 +88,7 @@ public class CompressionParameters
         }
         catch (Exception e)
         {
-            throw new ConfigurationException("Could not create Compression for type " + cc.toString(), e);
+            throw new ConfigurationException("Could not create Compression for type " + className, e);
         }
     }
 
@@ -126,21 +137,19 @@ public class CompressionParameters
         return compressionOptions;
     }
 
-    private static int getChunkLength(Map<String, String> options) throws ConfigurationException
+    private static Integer parseChunkLength(String chLength) throws ConfigurationException
     {
-        int chunkLength = DEFAULT_CHUNK_LENGTH;
-        if (options != null && options.containsKey(CHUNK_LENGTH_PARAMETER))
+        if (chLength == null)
+            return null;
+
+        try
         {
-            try
-            {
-                chunkLength = Integer.parseInt(options.get(CHUNK_LENGTH_PARAMETER));
-            }
-            catch (NumberFormatException e)
-            {
-                throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_PARAMETER, e);
-            }
+            return Integer.parseInt(chLength);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH, e);
         }
-        return chunkLength;
     }
 
     // chunkLength must be a power of 2 because we assume so when
@@ -148,8 +157,11 @@ public class CompressionParameters
     // CompressedRandomAccessReader.decompresseChunk())
     private void validateChunkLength() throws ConfigurationException
     {
+        if (chunkLength == null)
+            return; // chunk length not set, this is fine, default will be used
+
         if (chunkLength <= 0)
-            throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_PARAMETER);
+            throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH);
 
         int c = chunkLength;
         boolean found = false;
@@ -158,7 +170,7 @@ public class CompressionParameters
             if ((c & 0x01) != 0)
             {
                 if (found)
-                    throw new ConfigurationException(CHUNK_LENGTH_PARAMETER + " must be a power of 2");
+                    throw new ConfigurationException(CHUNK_LENGTH + " must be a power of 2");
                 else
                     found = true;
             }
@@ -166,6 +178,33 @@ public class CompressionParameters
         }
     }
 
+    public Map<CharSequence, CharSequence> asAvroOptions()
+    {
+        Map<CharSequence, CharSequence> options = new HashMap<CharSequence, CharSequence>();
+        for (Map.Entry<String, String> entry : otherOptions.entrySet())
+            options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
+
+        if (sstableCompressor == null)
+            return options;
+
+        options.put(new Utf8(SSTABLE_COMPRESSION), new Utf8(sstableCompressor.getClass().getName()));
+        if (chunkLength != null)
+            options.put(new Utf8(CHUNK_LENGTH), new Utf8(chunkLength.toString()));
+        return options;
+    }
+
+    public Map<String, String> asThriftOptions()
+    {
+        Map<String, String> options = new HashMap<String, String>(otherOptions);
+        if (sstableCompressor == null)
+            return options;
+
+        options.put(SSTABLE_COMPRESSION, sstableCompressor.getClass().getName());
+        if (chunkLength != null)
+            options.put(CHUNK_LENGTH, chunkLength.toString());
+        return options;
+    }
+
     @Override
     public boolean equals(Object obj)
     {
@@ -180,8 +219,9 @@ public class CompressionParameters
 
         CompressionParameters cp = (CompressionParameters) obj;
         return new EqualsBuilder()
-            .append(compressorClass, cp.compressorClass)
-            .append(compressionOptions, cp.compressionOptions)
+            .append(sstableCompressor, cp.sstableCompressor)
+            .append(chunkLength, cp.chunkLength)
+            .append(otherOptions, cp.otherOptions)
             .isEquals();
     }
 
@@ -189,8 +229,9 @@ public class CompressionParameters
     public int hashCode()
     {
         return new HashCodeBuilder(29, 1597)
-            .append(compressorClass)
-            .append(compressionOptions)
+            .append(sstableCompressor)
+            .append(chunkLength)
+            .append(otherOptions)
             .toHashCode();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Sep  6 09:33:33 2011
@@ -64,7 +64,7 @@ public class SSTableWriter extends SSTab
     private static Set<Component> components(CFMetaData metadata)
     {
         Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS));
-        if (metadata.compressionParameters().compressor != null)
+        if (metadata.compressionParameters().sstableCompressor != null)
             components.add(Component.COMPRESSION_INFO);
         return components;
     }

Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml (original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Tue Sep  6 09:33:33 2011
@@ -569,11 +569,14 @@ commands:
         It is also valid to specify the fully-qualified class name to a class
         that implements org.apache.cassandra.io.ICompressor.
 
-        - compression_options: Optional additional options for compression.
-          Options have the form [{key:value}], and may depends on the
-          compression algorithm used. One generic option is chunk_length_kb
-          that allows to specify the size of the chunk used by compression
-          (default to 64, must be a power of 2).
+        - compression_options: Options related to compression.
+          Options have the form [{key:value}]. The main recognized option are:
+          - sstable_compression: the algorithm to use to compress sstables for
+          this column family. If none is provided, compression will not be
+          enabled. Supported values are SnappyCompressor, DeflateCompressor or
+          any custom compressor.
+          - chunk_length_kb: specify the size of the chunk used by sstable
+          compression (default to 64, must be a power of 2).
 
         Examples:
         create column family Super4

Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Tue Sep  6 09:33:33 2011
@@ -447,10 +447,13 @@ public class Session implements Serializ
 
         // column family for standard columns
         CfDef standardCfDef = new CfDef("Keyspace1", "Standard1");
+        Map<String, String> compressionOptions = new HashMap<String, String>();
+        if (compression != null)
+            compressionOptions.put("sstable_compression", compression);
 
         standardCfDef.setComparator_type(DEFAULT_COMPARATOR)
                      .setDefault_validation_class(DEFAULT_VALIDATOR)
-                     .setCompression(compression);
+                     .setCompression_options(compressionOptions);
 
         if (indexType != null)
         {
@@ -464,13 +467,13 @@ public class Session implements Serializ
         superCfDef.setComparator_type(DEFAULT_COMPARATOR)
                   .setSubcomparator_type(DEFAULT_COMPARATOR)
                   .setDefault_validation_class(DEFAULT_VALIDATOR)
-                  .setCompression(compression);
+                  .setCompression_options(compressionOptions);
 
         // column family for standard counters
-        CfDef counterCfDef = new CfDef("Keyspace1", "Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression(compression);
+        CfDef counterCfDef = new CfDef("Keyspace1", "Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression_options(compressionOptions);
 
         // column family with counter super columns
-        CfDef counterSuperCfDef = new CfDef("Keyspace1", "SuperCounter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setColumn_type("Super").setCompression(compression);
+        CfDef counterSuperCfDef = new CfDef("Keyspace1", "SuperCounter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setColumn_type("Super").setCompression_options(compressionOptions);
 
         keyspace.setName("Keyspace1");
         keyspace.setStrategy_class(replicationStrategy);