You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2012/02/14 18:59:39 UTC
[2/2] git commit: Support compression using BulkWriter Patch by
goffinet, reviewed by Brandon Williams for CASSANDRA-3907
Support compression using BulkWriter
Patch by goffinet, reviewed by Brandon Williams for CASSANDRA-3907
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5dac2086
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5dac2086
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5dac2086
Branch: refs/heads/trunk
Commit: 5dac2086dcd4cb6845b814384d04952c986ac240
Parents: 648e62e
Author: Chris Goffinet <cg...@chrisgoffinet.com>
Authored: Tue Feb 14 09:58:15 2012 -0800
Committer: Chris Goffinet <cg...@chrisgoffinet.com>
Committed: Tue Feb 14 09:58:15 2012 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/hadoop/BulkRecordWriter.java | 3 +-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 40 +++++++++++++++
.../io/sstable/SSTableSimpleUnsortedWriter.java | 17 ++++++-
4 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9481f5e..e3207ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -76,6 +76,7 @@
* Finish cleanup up tombstone purge code (CASSANDRA-3872)
* Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904)
* BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906)
+ * Support compression using BulkWriter (CASSANDRA-3907)
1.0.8
* fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index aded15e..6f056c4 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -127,7 +127,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
ConfigHelper.getOutputColumnFamily(conf),
BytesType.instance,
subcomparator,
- Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+ Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")),
+ ConfigHelper.getOutputCompressionParamaters(conf));
this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index bafb195..c9ab70e 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -22,8 +22,11 @@ package org.apache.cassandra.hadoop;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +71,8 @@ public class ConfigHelper
private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
+ private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
+ private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
@@ -406,6 +411,41 @@ public class ConfigHelper
}
}
+ public static String getOutputCompressionClass(Configuration conf)
+ {
+ return conf.get(OUTPUT_COMPRESSION_CLASS);
+ }
+
+ public static String getOutputCompressionChunkLength(Configuration conf)
+ {
+ return conf.get(OUTPUT_COMPRESSION_CHUNK_LENGTH, String.valueOf(CompressionParameters.DEFAULT_CHUNK_LENGTH));
+ }
+
+ public static void setOutputCompressionClass(Configuration conf, String classname)
+ {
+ conf.set(OUTPUT_COMPRESSION_CLASS, classname);
+ }
+
+ public static void setOutputCompressionChunkLength(Configuration conf, String length)
+ {
+ conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
+ }
+
+ public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
+ {
+ if (getOutputCompressionClass(conf) == null)
+ return new CompressionParameters(null);
+
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(CompressionParameters.SSTABLE_COMPRESSION, getOutputCompressionClass(conf));
+ options.put(CompressionParameters.CHUNK_LENGTH_KB, getOutputCompressionChunkLength(conf));
+
+ try {
+ return CompressionParameters.create(options);
+ } catch (ConfigurationException e) {
+ throw new RuntimeException(e);
+ }
+ }
public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index eadc16d..b869d1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.HeapAllocator;
@@ -75,13 +76,25 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
String columnFamily,
AbstractType<?> comparator,
AbstractType<?> subComparator,
- int bufferSizeInMB) throws IOException
+ int bufferSizeInMB,
+ CompressionParameters compressParameters) throws IOException
{
- super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner);
+ super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
this.diskWriter.start();
}
+ public SSTableSimpleUnsortedWriter(File directory,
+ IPartitioner partitioner,
+ String keyspace,
+ String columnFamily,
+ AbstractType<?> comparator,
+ AbstractType<?> subComparator,
+ int bufferSizeInMB) throws IOException
+ {
+ this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null));
+ }
+
protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
{
currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;