You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2012/02/14 18:59:39 UTC

[2/2] git commit: Support compression using BulkWriter Patch by goffinet, reviewed by Brandon Williams for CASSANDRA-3907

Support compression using BulkWriter
Patch by goffinet, reviewed by Brandon Williams for CASSANDRA-3907


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5dac2086
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5dac2086
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5dac2086

Branch: refs/heads/trunk
Commit: 5dac2086dcd4cb6845b814384d04952c986ac240
Parents: 648e62e
Author: Chris Goffinet <cg...@chrisgoffinet.com>
Authored: Tue Feb 14 09:58:15 2012 -0800
Committer: Chris Goffinet <cg...@chrisgoffinet.com>
Committed: Tue Feb 14 09:58:15 2012 -0800

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |    3 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   40 +++++++++++++++
 .../io/sstable/SSTableSimpleUnsortedWriter.java    |   17 ++++++-
 4 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9481f5e..e3207ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -76,6 +76,7 @@
  * Finish cleanup up tombstone purge code (CASSANDRA-3872)
  * Avoid NPE on aboarted stream-out sessions (CASSANDRA-3904)
  * BulkRecordWriter throws NPE for counter columns (CASSANDRA-3906)
+ * Support compression using BulkWriter (CASSANDRA-3907)
 
 1.0.8
  * fix race between cleanup and flush on secondary index CFSes (CASSANDRA-3712)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index aded15e..6f056c4 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -127,7 +127,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     ConfigHelper.getOutputColumnFamily(conf),
                     BytesType.instance,
                     subcomparator,
-                    Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+                    Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")),
+                    ConfigHelper.getOutputCompressionParamaters(conf));
             this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index bafb195..c9ab70e 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -22,8 +22,11 @@ package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +71,8 @@ public class ConfigHelper
     private static final String OUTPUT_INITIAL_THRIFT_ADDRESS = "cassandra.output.thrift.address";
     private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
+    private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
+    private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
     
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -406,6 +411,41 @@ public class ConfigHelper
         }
     }
 
+    public static String getOutputCompressionClass(Configuration conf)
+    {
+        return conf.get(OUTPUT_COMPRESSION_CLASS);
+    }
+
+    public static String getOutputCompressionChunkLength(Configuration conf)
+    {
+        return conf.get(OUTPUT_COMPRESSION_CHUNK_LENGTH, String.valueOf(CompressionParameters.DEFAULT_CHUNK_LENGTH));
+    }
+
+    public static void setOutputCompressionClass(Configuration conf, String classname)
+    {
+        conf.set(OUTPUT_COMPRESSION_CLASS, classname);
+    }
+
+    public static void setOutputCompressionChunkLength(Configuration conf, String length)
+    {
+        conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
+    }
+
+    public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
+    {
+        if (getOutputCompressionClass(conf) == null)
+            return new CompressionParameters(null);
+
+        Map<String, String> options = new HashMap<String, String>();
+        options.put(CompressionParameters.SSTABLE_COMPRESSION, getOutputCompressionClass(conf));
+        options.put(CompressionParameters.CHUNK_LENGTH_KB, getOutputCompressionChunkLength(conf));
+
+        try {
+            return CompressionParameters.create(options);
+        } catch (ConfigurationException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dac2086/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index eadc16d..b869d1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.HeapAllocator;
 
@@ -75,13 +76,25 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                                        String columnFamily,
                                        AbstractType<?> comparator,
                                        AbstractType<?> subComparator,
-                                       int bufferSizeInMB) throws IOException
+                                       int bufferSizeInMB,
+                                       CompressionParameters compressParameters) throws IOException
     {
-        super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner);
+        super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator).compressionParameters(compressParameters), partitioner);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
         this.diskWriter.start();
     }
 
+    public SSTableSimpleUnsortedWriter(File directory,
+                                       IPartitioner partitioner,
+                                       String keyspace,
+                                       String columnFamily,
+                                       AbstractType<?> comparator,
+                                       AbstractType<?> subComparator,
+                                       int bufferSizeInMB) throws IOException
+    {
+        this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null));
+    }
+
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
     {
         currentSize += key.key.remaining() + columnFamily.serializedSize() * 1.2;