You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/06 11:33:34 UTC
svn commit: r1165577 - in /cassandra/trunk: ./ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/io/compress/ src/java/org/...
Author: slebresne
Date: Tue Sep 6 09:33:33 2011
New Revision: 1165577
URL: http://svn.apache.org/viewvc?rev=1165577&view=rev
Log:
Use only one parameters for compression options
patch by slebresne; reviewed by jbellis for CASSANDRA-3128
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
cassandra/trunk/src/avro/internode.genavro
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 6 09:33:33 2011
@@ -25,7 +25,7 @@
(CASSANDRA-2953)
* add paging to get_count (CASSANDRA-2894)
* fix "short reads" in [multi]get (CASSANDRA-2643)
- * add optional compression for sstables (CASSANDRA-47)
+ * add optional compression for sstables (CASSANDRA-47, 3001, 3128)
* add scheduler JMX metrics (CASSANDRA-2962)
* add block level checksum for compressed data (CASSANDRA-1717)
* make column family backed column map pluggable and introduce unsynchronized
@@ -53,7 +53,6 @@
* remove compaction_thread_priority setting (CASSANDRA-3104)
* generate hints for replicas that timeout, not just replicas that are known
to be down before starting (CASSANDRA-2034)
- * Make the compression algorithm and chunk length configurable (CASSANDRA-3001)
* Add throttling for internode streaming (CASSANDRA-3080)
* make the repair of a range repair all replica (CASSANDRA-2610)
* expose the ability to repair the first range (as returned by the
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Sep 6 09:33:33 2011
@@ -11,14 +11,13 @@ Upgrading
Features
--------
- - "cassandra.bat install" will install Cassandra as a Windows Service
- - SSTable compression can be enabled by setting "compressionParameters"
- attribute on CFMetaData (set "compressionParameters" to null to disable compression).
- SnappyCompressor and DeflateCompressor are currently
- available as "compression class" and "chunk_length_kb" as compression
- option. Stress tool supports -I option to provide compressor and
- CLI `{create/update} column family` commands support "compression"
- and "compression_options" parameters.
+ - SSTable compression is supported through the 'compression_options'
+ parameter when creating/updating a column family. For instance, you can
+ create a column family Cf using compression (using the Snappy library)
+ in the CLI with:
+ create column family Cf with compression_options={sstable_compression: SnappyCompressor}
+ SSTable compression is not activated by default but can be activated or
+ deactivated at any time.
- Compressed SSTable blocks are checksummed to protect against bitrot
- New LevelDB-inspired compaction algorithm can be enabled by setting the
Columnfamily compaction_strategy=LeveledCompactionStrategy option.
@@ -34,6 +33,10 @@ Features
- A dead node may be replaced in a single step by starting a new node
with -Dcassandra.replace_token=<token>. More details can be found at
http://wiki.apache.org/cassandra/Operations#Replacing_a_Dead_Node
+ - It is now possible to repair only the first range returned by the
+ partitioner for a node with `nodetool repair -pr`. It makes it
+ easier/possible to repair a full cluster without any work duplication by
+ running this command on every node of the cluster.
Other
-----
@@ -41,10 +44,6 @@ Other
when HH is enabled, repair only needs to be run if a node crashes.
- Because of this, read repair is disabled now by default on newly
created ColumnFamilies.
- - It is now possible to repair only the first range returned by the
- partitioner for a node with `nodetool repair -pr`. It makes it
- easier/possible to repair a full cluster without any work duplication by
- running this command on every node of the cluster.
0.8.5
Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Sep 6 09:33:33 2011
@@ -399,8 +399,7 @@ struct CfDef {
29: optional string compaction_strategy,
30: optional map<string,string> compaction_strategy_options,
31: optional i32 row_cache_keys_to_save,
- 32: optional string compression,
- 33: optional map<string,string> compression_options,
+ 32: optional map<string,string> compression_options,
}
/* describes a keyspace. */
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java Tue Sep 6 09:33:33 2011
@@ -9086,8 +9086,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -17043,8 +17041,6 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -25754,6 +25750,8 @@ public class Cassandra {
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Sep 6 09:33:33 2011
@@ -72,8 +72,7 @@ public class CfDef implements org.apache
private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy", org.apache.thrift.protocol.TType.STRING, (short)29);
private static final org.apache.thrift.protocol.TField COMPACTION_STRATEGY_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compaction_strategy_options", org.apache.thrift.protocol.TType.MAP, (short)30);
private static final org.apache.thrift.protocol.TField ROW_CACHE_KEYS_TO_SAVE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_keys_to_save", org.apache.thrift.protocol.TType.I32, (short)31);
- private static final org.apache.thrift.protocol.TField COMPRESSION_FIELD_DESC = new org.apache.thrift.protocol.TField("compression", org.apache.thrift.protocol.TType.STRING, (short)32);
- private static final org.apache.thrift.protocol.TField COMPRESSION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compression_options", org.apache.thrift.protocol.TType.MAP, (short)33);
+ private static final org.apache.thrift.protocol.TField COMPRESSION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("compression_options", org.apache.thrift.protocol.TType.MAP, (short)32);
public String keyspace;
public String name;
@@ -102,7 +101,6 @@ public class CfDef implements org.apache
public String compaction_strategy;
public Map<String,String> compaction_strategy_options;
public int row_cache_keys_to_save;
- public String compression;
public Map<String,String> compression_options;
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@@ -134,8 +132,7 @@ public class CfDef implements org.apache
COMPACTION_STRATEGY((short)29, "compaction_strategy"),
COMPACTION_STRATEGY_OPTIONS((short)30, "compaction_strategy_options"),
ROW_CACHE_KEYS_TO_SAVE((short)31, "row_cache_keys_to_save"),
- COMPRESSION((short)32, "compression"),
- COMPRESSION_OPTIONS((short)33, "compression_options");
+ COMPRESSION_OPTIONS((short)32, "compression_options");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -204,9 +201,7 @@ public class CfDef implements org.apache
return COMPACTION_STRATEGY_OPTIONS;
case 31: // ROW_CACHE_KEYS_TO_SAVE
return ROW_CACHE_KEYS_TO_SAVE;
- case 32: // COMPRESSION
- return COMPRESSION;
- case 33: // COMPRESSION_OPTIONS
+ case 32: // COMPRESSION_OPTIONS
return COMPRESSION_OPTIONS;
default:
return null;
@@ -324,8 +319,6 @@ public class CfDef implements org.apache
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
tmpMap.put(_Fields.ROW_CACHE_KEYS_TO_SAVE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_keys_to_save", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
- tmpMap.put(_Fields.COMPRESSION, new org.apache.thrift.meta_data.FieldMetaData("compression", org.apache.thrift.TFieldRequirementType.OPTIONAL,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.COMPRESSION_OPTIONS, new org.apache.thrift.meta_data.FieldMetaData("compression_options", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
@@ -434,9 +427,6 @@ public class CfDef implements org.apache
this.compaction_strategy_options = __this__compaction_strategy_options;
}
this.row_cache_keys_to_save = other.row_cache_keys_to_save;
- if (other.isSetCompression()) {
- this.compression = other.compression;
- }
if (other.isSetCompression_options()) {
Map<String,String> __this__compression_options = new HashMap<String,String>();
for (Map.Entry<String, String> other_element : other.compression_options.entrySet()) {
@@ -504,7 +494,6 @@ public class CfDef implements org.apache
this.compaction_strategy_options = null;
setRow_cache_keys_to_saveIsSet(false);
this.row_cache_keys_to_save = 0;
- this.compression = null;
this.compression_options = null;
}
@@ -1178,30 +1167,6 @@ public class CfDef implements org.apache
__isset_bit_vector.set(__ROW_CACHE_KEYS_TO_SAVE_ISSET_ID, value);
}
- public String getCompression() {
- return this.compression;
- }
-
- public CfDef setCompression(String compression) {
- this.compression = compression;
- return this;
- }
-
- public void unsetCompression() {
- this.compression = null;
- }
-
- /** Returns true if field compression is set (has been assigned a value) and false otherwise */
- public boolean isSetCompression() {
- return this.compression != null;
- }
-
- public void setCompressionIsSet(boolean value) {
- if (!value) {
- this.compression = null;
- }
- }
-
public int getCompression_optionsSize() {
return (this.compression_options == null) ? 0 : this.compression_options.size();
}
@@ -1455,14 +1420,6 @@ public class CfDef implements org.apache
}
break;
- case COMPRESSION:
- if (value == null) {
- unsetCompression();
- } else {
- setCompression((String)value);
- }
- break;
-
case COMPRESSION_OPTIONS:
if (value == null) {
unsetCompression_options();
@@ -1557,9 +1514,6 @@ public class CfDef implements org.apache
case ROW_CACHE_KEYS_TO_SAVE:
return new Integer(getRow_cache_keys_to_save());
- case COMPRESSION:
- return getCompression();
-
case COMPRESSION_OPTIONS:
return getCompression_options();
@@ -1628,8 +1582,6 @@ public class CfDef implements org.apache
return isSetCompaction_strategy_options();
case ROW_CACHE_KEYS_TO_SAVE:
return isSetRow_cache_keys_to_save();
- case COMPRESSION:
- return isSetCompression();
case COMPRESSION_OPTIONS:
return isSetCompression_options();
}
@@ -1892,15 +1844,6 @@ public class CfDef implements org.apache
return false;
}
- boolean this_present_compression = true && this.isSetCompression();
- boolean that_present_compression = true && that.isSetCompression();
- if (this_present_compression || that_present_compression) {
- if (!(this_present_compression && that_present_compression))
- return false;
- if (!this.compression.equals(that.compression))
- return false;
- }
-
boolean this_present_compression_options = true && this.isSetCompression_options();
boolean that_present_compression_options = true && that.isSetCompression_options();
if (this_present_compression_options || that_present_compression_options) {
@@ -2052,11 +1995,6 @@ public class CfDef implements org.apache
if (present_row_cache_keys_to_save)
builder.append(row_cache_keys_to_save);
- boolean present_compression = true && (isSetCompression());
- builder.append(present_compression);
- if (present_compression)
- builder.append(compression);
-
boolean present_compression_options = true && (isSetCompression_options());
builder.append(present_compression_options);
if (present_compression_options)
@@ -2343,16 +2281,6 @@ public class CfDef implements org.apache
return lastComparison;
}
}
- lastComparison = Boolean.valueOf(isSetCompression()).compareTo(typedOther.isSetCompression());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetCompression()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compression, typedOther.compression);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
lastComparison = Boolean.valueOf(isSetCompression_options()).compareTo(typedOther.isSetCompression_options());
if (lastComparison != 0) {
return lastComparison;
@@ -2606,14 +2534,7 @@ public class CfDef implements org.apache
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
- case 32: // COMPRESSION
- if (field.type == org.apache.thrift.protocol.TType.STRING) {
- this.compression = iprot.readString();
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 33: // COMPRESSION_OPTIONS
+ case 32: // COMPRESSION_OPTIONS
if (field.type == org.apache.thrift.protocol.TType.MAP) {
{
org.apache.thrift.protocol.TMap _map41 = iprot.readMapBegin();
@@ -2819,13 +2740,6 @@ public class CfDef implements org.apache
oprot.writeI32(this.row_cache_keys_to_save);
oprot.writeFieldEnd();
}
- if (this.compression != null) {
- if (isSetCompression()) {
- oprot.writeFieldBegin(COMPRESSION_FIELD_DESC);
- oprot.writeString(this.compression);
- oprot.writeFieldEnd();
- }
- }
if (this.compression_options != null) {
if (isSetCompression_options()) {
oprot.writeFieldBegin(COMPRESSION_OPTIONS_FIELD_DESC);
@@ -3059,16 +2973,6 @@ public class CfDef implements org.apache
sb.append(this.row_cache_keys_to_save);
first = false;
}
- if (isSetCompression()) {
- if (!first) sb.append(", ");
- sb.append("compression:");
- if (this.compression == null) {
- sb.append("null");
- } else {
- sb.append(this.compression);
- }
- first = false;
- }
if (isSetCompression_options()) {
if (!first) sb.append(", ");
sb.append("compression_options:");
Modified: cassandra/trunk/src/avro/internode.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/avro/internode.genavro?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/avro/internode.genavro (original)
+++ cassandra/trunk/src/avro/internode.genavro Tue Sep 6 09:33:33 2011
@@ -72,7 +72,6 @@ protocol InterNode {
union { null, bytes } key_alias = null;
union { null, string } compaction_strategy = null;
union { null, map<string> } compaction_strategy_options = null;
- union { null, string } compression = null;
union { null, map<string> } compression_options = null;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Sep 6 09:33:33 2011
@@ -137,7 +137,6 @@ public class CliClient
KEY_VALIDATION_CLASS,
COMPACTION_STRATEGY,
COMPACTION_STRATEGY_OPTIONS,
- COMPRESSION,
COMPRESSION_OPTIONS,
}
@@ -1255,9 +1254,6 @@ public class CliClient
case COMPACTION_STRATEGY_OPTIONS:
cfDef.setCompaction_strategy_options(getStrategyOptionsFromTree(statement.getChild(i+1)));
break;
- case COMPRESSION:
- cfDef.setCompression(mValue.toLowerCase().equals("null") ? null : CliUtils.unescapeSQLString(mValue));
- break;
case COMPRESSION_OPTIONS:
cfDef.setCompression_options(getStrategyOptionsFromTree(statement.getChild(i+1)));
break;
@@ -1936,7 +1932,6 @@ public class CliClient
sessionState.out.printf(" Compaction min/max thresholds: %s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
sessionState.out.printf(" Read repair chance: %s%n", cf_def.read_repair_chance);
sessionState.out.printf(" Replicate on write: %s%n", cf_def.replicate_on_write);
- sessionState.out.printf(" Compression: %s%n", cf_def.compression);
// if we have connection to the cfMBean established
if (cfMBean != null)
@@ -1987,6 +1982,13 @@ public class CliClient
for (Map.Entry<String, String> e : cf_def.compaction_strategy_options.entrySet())
sessionState.out.printf(" %s: %s%n", e.getKey(), e.getValue());
}
+
+ if (cf_def.compression_options != null && !cf_def.compression_options.isEmpty())
+ {
+ sessionState.out.println(" Compression Options:");
+ for (Map.Entry<String, String> e : cf_def.compression_options.entrySet())
+ sessionState.out.printf(" %s: %s%n", e.getKey(), e.getValue());
+ }
}
// DESCRIBE KEYSPACE (<keyspace> | <column_family>)?
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Sep 6 09:33:33 2011
@@ -345,16 +345,7 @@ public final class CFMetaData
for (Map.Entry<String, String> e : compactionStrategyOptions.entrySet())
cf.compaction_strategy_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
}
- if (compressionParameters.compressorClass != null)
- {
- cf.compression = new Utf8(compressionParameters.compressorClass.getName());
- if (compressionParameters.compressionOptions != null)
- {
- cf.compression_options = new HashMap<CharSequence, CharSequence>();
- for (Map.Entry<String, String> e : compressionParameters.compressionOptions.entrySet())
- cf.compression_options.put(new Utf8(e.getKey()), new Utf8(e.getValue()));
- }
- }
+ cf.compression_options = compressionParameters.asAvroOptions();
return cf;
}
@@ -437,7 +428,7 @@ public final class CFMetaData
CompressionParameters cp;
try
{
- cp = new CompressionParameters(cf.compression, cf.compression_options);
+ cp = CompressionParameters.create(cf.compression_options);
}
catch (ConfigurationException e)
{
@@ -714,7 +705,7 @@ public final class CFMetaData
if (cf_def.isSetCompaction_strategy_options())
newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options));
- CompressionParameters cp = new CompressionParameters(cf_def.compression, cf_def.compression_options);
+ CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
return newCFMD.comment(cf_def.comment)
.rowCacheSize(cf_def.row_cache_size)
@@ -830,7 +821,7 @@ public final class CFMetaData
compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
}
- compressionParameters = new CompressionParameters(cf_def.compression, cf_def.compression_options);
+ compressionParameters = CompressionParameters.create(cf_def.compression_options);
logger.debug("application result is {}", this);
}
@@ -922,11 +913,7 @@ public final class CFMetaData
def.setColumn_metadata(column_meta);
def.setCompaction_strategy(compactionStrategyClass.getName());
def.setCompaction_strategy_options(new HashMap<String, String>(compactionStrategyOptions));
- if (compressionParameters.compressorClass != null)
- {
- def.setCompression(compressionParameters.compressorClass.getName());
- def.setCompression_options(compressionParameters.compressionOptions);
- }
+ def.setCompression_options(compressionParameters.asThriftOptions());
return def;
}
@@ -1051,8 +1038,7 @@ public final class CFMetaData
.append("column_metadata", column_metadata)
.append("compactionStrategyClass", compactionStrategyClass)
.append("compactionStrategyOptions", compactionStrategyOptions)
- .append("compressorClass", compressionParameters.compressorClass)
- .append("compressionOptions", compressionParameters.compressionOptions)
+ .append("compressionOptions", compressionParameters.asThriftOptions())
.toString();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java Tue Sep 6 09:33:33 2011
@@ -51,8 +51,8 @@ public class CompressedSequentialWriter
public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
{
- super(file, parameters.chunkLength, skipIOCache);
- this.compressor = parameters.compressor;
+ super(file, parameters.chunkLength(), skipIOCache);
+ this.compressor = parameters.sstableCompressor;
// buffer for compression should be the same size as buffer itself
compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java Tue Sep 6 09:33:33 2011
@@ -54,7 +54,7 @@ public class CompressionMetadata
int chunkLength = stream.readInt();
try
{
- parameters = new CompressionParameters(compressorName, options, chunkLength);
+ parameters = new CompressionParameters(compressorName, chunkLength, options);
}
catch (ConfigurationException e)
{
@@ -70,12 +70,12 @@ public class CompressionMetadata
public ICompressor compressor()
{
- return parameters.compressor;
+ return parameters.sstableCompressor;
}
public int chunkLength()
{
- return parameters.chunkLength;
+ return parameters.chunkLength();
}
/**
@@ -120,7 +120,7 @@ public class CompressionMetadata
public Chunk chunkFor(long position) throws IOException
{
// position of the chunk
- int idx = (int) (position / parameters.chunkLength);
+ int idx = (int) (position / parameters.chunkLength());
if (idx >= chunkOffsets.length)
throw new EOFException();
@@ -146,16 +146,16 @@ public class CompressionMetadata
public void writeHeader(CompressionParameters parameters) throws IOException
{
// algorithm
- writeUTF(parameters.compressorClass.getSimpleName());
- writeInt(parameters.compressionOptions.size());
- for (Map.Entry<String, String> entry : parameters.compressionOptions.entrySet())
+ writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+ writeInt(parameters.otherOptions.size());
+ for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
{
writeUTF(entry.getKey());
writeUTF(entry.getValue());
}
// store the length of the chunk
- writeInt(parameters.chunkLength);
+ writeInt(parameters.chunkLength());
// store position and reserve a place for uncompressed data length and chunks count
dataLengthOffset = getFilePointer();
writeLong(-1);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressionParameters.java Tue Sep 6 09:33:33 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.avro.util.Utf8;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -32,44 +33,54 @@ import org.apache.cassandra.config.Confi
public class CompressionParameters
{
public final static int DEFAULT_CHUNK_LENGTH = 65536;
- public static final String CHUNK_LENGTH_PARAMETER = "chunk_length_kb";
- public final Class<? extends ICompressor> compressorClass;
- public final Map<String, String> compressionOptions;
+ public static final String SSTABLE_COMPRESSION = "sstable_compression";
+ public static final String CHUNK_LENGTH = "chunk_length_kb";
- public final transient ICompressor compressor;
- public final transient int chunkLength;
+ public final ICompressor sstableCompressor;
+ private final Integer chunkLength;
+ public final Map<String, String> otherOptions; // Unrecognized options, can be use by the compressor
- public CompressionParameters(CharSequence compressorClassName, Map<? extends CharSequence, ? extends CharSequence> options) throws ConfigurationException
+ public static CompressionParameters create(Map<? extends CharSequence, ? extends CharSequence> opts) throws ConfigurationException
{
- this(compressorClassName, copyOptions(options), -1);
+ Map<String, String> options = copyOptions(opts);
+ String sstableCompressionClass = options.get(SSTABLE_COMPRESSION);
+ String chunkLength = options.get(CHUNK_LENGTH);
+ options.remove(SSTABLE_COMPRESSION);
+ options.remove(CHUNK_LENGTH);
+ CompressionParameters cp = new CompressionParameters(sstableCompressionClass, parseChunkLength(chunkLength), options);
+ cp.validateChunkLength();
+ return cp;
}
- public CompressionParameters(CharSequence compressorClassName, Map<String, String> options, int chunkLength) throws ConfigurationException
+ public CompressionParameters(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
{
- this(createCompressor(parseCompressorClass(compressorClassName), options), options, chunkLength < 0 ? getChunkLength(options) : chunkLength);
- validateChunkLength();
+ this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions);
}
- public CompressionParameters(ICompressor compressor)
+ public CompressionParameters(ICompressor sstableCompressor)
{
- this(compressor, null, DEFAULT_CHUNK_LENGTH);
+ this(sstableCompressor, null, Collections.<String, String>emptyMap());
}
- public CompressionParameters(ICompressor compressor, Map<String, String> compressionOptions, int chunkLength)
+ public CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions)
{
- this.compressorClass = compressor == null ? null : compressor.getClass();
- this.compressionOptions = compressor == null ? null : (compressionOptions == null ? Collections.<String, String>emptyMap() : compressionOptions);
+ this.sstableCompressor = sstableCompressor;
this.chunkLength = chunkLength;
- this.compressor = compressor;
+ this.otherOptions = otherOptions;
}
- private static Class<? extends ICompressor> parseCompressorClass(CharSequence cc) throws ConfigurationException
+ public int chunkLength()
{
- if (cc == null)
+ return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
+ }
+
+
+ private static Class<? extends ICompressor> parseCompressorClass(String className) throws ConfigurationException
+ {
+ if (className == null)
return null;
- String className = cc.toString();
className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className;
try
{
@@ -77,7 +88,7 @@ public class CompressionParameters
}
catch (Exception e)
{
- throw new ConfigurationException("Could not create Compression for type " + cc.toString(), e);
+ throw new ConfigurationException("Could not create Compression for type " + className, e);
}
}
@@ -126,21 +137,19 @@ public class CompressionParameters
return compressionOptions;
}
- private static int getChunkLength(Map<String, String> options) throws ConfigurationException
+ private static Integer parseChunkLength(String chLength) throws ConfigurationException
{
- int chunkLength = DEFAULT_CHUNK_LENGTH;
- if (options != null && options.containsKey(CHUNK_LENGTH_PARAMETER))
+ if (chLength == null)
+ return null;
+
+ try
{
- try
- {
- chunkLength = Integer.parseInt(options.get(CHUNK_LENGTH_PARAMETER));
- }
- catch (NumberFormatException e)
- {
- throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_PARAMETER, e);
- }
+ return Integer.parseInt(chLength);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH, e);
}
- return chunkLength;
}
// chunkLength must be a power of 2 because we assume so when
@@ -148,8 +157,11 @@ public class CompressionParameters
// CompressedRandomAccessReader.decompresseChunk())
private void validateChunkLength() throws ConfigurationException
{
+ if (chunkLength == null)
+ return; // chunk length not set, this is fine, default will be used
+
if (chunkLength <= 0)
- throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_PARAMETER);
+ throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH);
int c = chunkLength;
boolean found = false;
@@ -158,7 +170,7 @@ public class CompressionParameters
if ((c & 0x01) != 0)
{
if (found)
- throw new ConfigurationException(CHUNK_LENGTH_PARAMETER + " must be a power of 2");
+ throw new ConfigurationException(CHUNK_LENGTH + " must be a power of 2");
else
found = true;
}
@@ -166,6 +178,33 @@ public class CompressionParameters
}
}
+ public Map<CharSequence, CharSequence> asAvroOptions()
+ {
+ Map<CharSequence, CharSequence> options = new HashMap<CharSequence, CharSequence>();
+ for (Map.Entry<String, String> entry : otherOptions.entrySet())
+ options.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
+
+ if (sstableCompressor == null)
+ return options;
+
+ options.put(new Utf8(SSTABLE_COMPRESSION), new Utf8(sstableCompressor.getClass().getName()));
+ if (chunkLength != null)
+ options.put(new Utf8(CHUNK_LENGTH), new Utf8(chunkLength.toString()));
+ return options;
+ }
+
+ public Map<String, String> asThriftOptions()
+ {
+ Map<String, String> options = new HashMap<String, String>(otherOptions);
+ if (sstableCompressor == null)
+ return options;
+
+ options.put(SSTABLE_COMPRESSION, sstableCompressor.getClass().getName());
+ if (chunkLength != null)
+ options.put(CHUNK_LENGTH, chunkLength.toString());
+ return options;
+ }
+
@Override
public boolean equals(Object obj)
{
@@ -180,8 +219,9 @@ public class CompressionParameters
CompressionParameters cp = (CompressionParameters) obj;
return new EqualsBuilder()
- .append(compressorClass, cp.compressorClass)
- .append(compressionOptions, cp.compressionOptions)
+ .append(sstableCompressor, cp.sstableCompressor)
+ .append(chunkLength, cp.chunkLength)
+ .append(otherOptions, cp.otherOptions)
.isEquals();
}
@@ -189,8 +229,9 @@ public class CompressionParameters
public int hashCode()
{
return new HashCodeBuilder(29, 1597)
- .append(compressorClass)
- .append(compressionOptions)
+ .append(sstableCompressor)
+ .append(chunkLength)
+ .append(otherOptions)
.toHashCode();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Sep 6 09:33:33 2011
@@ -64,7 +64,7 @@ public class SSTableWriter extends SSTab
private static Set<Component> components(CFMetaData metadata)
{
Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS));
- if (metadata.compressionParameters().compressor != null)
+ if (metadata.compressionParameters().sstableCompressor != null)
components.add(Component.COMPRESSION_INFO);
return components;
}
Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml (original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Tue Sep 6 09:33:33 2011
@@ -569,11 +569,14 @@ commands:
It is also valid to specify the fully-qualified class name to a class
that implements org.apache.cassandra.io.ICompressor.
- - compression_options: Optional additional options for compression.
- Options have the form [{key:value}], and may depends on the
- compression algorithm used. One generic option is chunk_length_kb
- that allows to specify the size of the chunk used by compression
- (default to 64, must be a power of 2).
+ - compression_options: Options related to compression.
+ Options have the form [{key:value}]. The main recognized option are:
+ - sstable_compression: the algorithm to use to compress sstables for
+ this column family. If none is provided, compression will not be
+ enabled. Supported values are SnappyCompressor, DeflateCompressor or
+ any custom compressor.
+ - chunk_length_kb: specify the size of the chunk used by sstable
+ compression (default to 64, must be a power of 2).
Examples:
create column family Super4
Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1165577&r1=1165576&r2=1165577&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Tue Sep 6 09:33:33 2011
@@ -447,10 +447,13 @@ public class Session implements Serializ
// column family for standard columns
CfDef standardCfDef = new CfDef("Keyspace1", "Standard1");
+ Map<String, String> compressionOptions = new HashMap<String, String>();
+ if (compression != null)
+ compressionOptions.put("sstable_compression", compression);
standardCfDef.setComparator_type(DEFAULT_COMPARATOR)
.setDefault_validation_class(DEFAULT_VALIDATOR)
- .setCompression(compression);
+ .setCompression_options(compressionOptions);
if (indexType != null)
{
@@ -464,13 +467,13 @@ public class Session implements Serializ
superCfDef.setComparator_type(DEFAULT_COMPARATOR)
.setSubcomparator_type(DEFAULT_COMPARATOR)
.setDefault_validation_class(DEFAULT_VALIDATOR)
- .setCompression(compression);
+ .setCompression_options(compressionOptions);
// column family for standard counters
- CfDef counterCfDef = new CfDef("Keyspace1", "Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression(compression);
+ CfDef counterCfDef = new CfDef("Keyspace1", "Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression_options(compressionOptions);
// column family with counter super columns
- CfDef counterSuperCfDef = new CfDef("Keyspace1", "SuperCounter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setColumn_type("Super").setCompression(compression);
+ CfDef counterSuperCfDef = new CfDef("Keyspace1", "SuperCounter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setColumn_type("Super").setCompression_options(compressionOptions);
keyspace.setName("Keyspace1");
keyspace.setStrategy_class(replicationStrategy);