You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/05 20:18:20 UTC

svn commit: r1465082 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: io/hfile/Compression.java regionserver/wal/SequenceFileLogWriter.java

Author: liyin
Date: Fri Apr  5 18:18:19 2013
New Revision: 1465082

URL: http://svn.apache.org/r1465082
Log:
[HBASE-8155] Allow Record Compression for HLogs

Author: nspiegelberg

Summary:
Hadoop already allows for native compression support with
SequenceFile. We should allow the user to enable this functionality
through the config. Besides backwards compatibility & existing
stabilized code, allowing users to enable record-level WAL support will
allow support value compression.
Preliminary results show 3x compression on LZO with minimal throughput
savings. This got more attention recently because it subsequently
facilitates longer log retention for backup & replication purposes.

Test Plan: - mvn test -Dtest=HLog

Reviewers: aaiyer, liyintang, pritam, amirshim

Reviewed By: aaiyer

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D745149

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1465082&r1=1465081&r2=1465082&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Fri Apr  5 18:18:19 2013
@@ -88,7 +88,7 @@ public final class Compression {
         lzoCodec = null;
       }
       @Override
-      CompressionCodec getCodec(Configuration conf) {
+      public CompressionCodec getCodec(Configuration conf) {
         if (lzoCodec == null) {
           try {
             Class<?> externalCodec = ClassLoader.getSystemClassLoader().loadClass(
@@ -106,7 +106,7 @@ public final class Compression {
       private transient Object lock = new Object();
 
       @Override
-      DefaultCodec getCodec(Configuration conf) {
+      public DefaultCodec getCodec(Configuration conf) {
         if (codec == null) {
           synchronized (lock) {
             if (codec == null) {
@@ -125,7 +125,7 @@ public final class Compression {
     },
     NONE("none") {
       @Override
-      DefaultCodec getCodec(Configuration conf) {
+      public DefaultCodec getCodec(Configuration conf) {
         return null;
       }
       @Override
@@ -169,7 +169,7 @@ public final class Compression {
       }
       @SuppressWarnings("unchecked")
       @Override
-      CompressionCodec getCodec(Configuration conf) {
+      public CompressionCodec getCodec(Configuration conf) {
         if (snappyCodec == null) {
           synchronized (lock) {
             if (snappyCodec == null) {
@@ -211,7 +211,7 @@ public final class Compression {
       this.compressName = name;
     }
 
-    abstract CompressionCodec getCodec(Configuration conf);
+    public abstract CompressionCodec getCodec(Configuration conf);
 
     public InputStream createDecompressionStream(
         InputStream downStream, Decompressor decompressor,

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1465082&r1=1465081&r2=1465082&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Fri Apr  5 18:18:19 2013
@@ -28,10 +28,12 @@ import java.lang.reflect.Method;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -50,6 +52,9 @@ public class SequenceFileLogWriter imple
   private OutputStream dfsClient_out;
   // The syncFs method from hdfs-200 or null if not available.
   private Method syncFs;
+  // The type of compression to apply to each transaction
+  private CompressionType compressionType;
+  private CompressionCodec codec;
 
   public SequenceFileLogWriter() {
     super();
@@ -58,6 +63,8 @@ public class SequenceFileLogWriter imple
   @Override
   public void init(FileSystem fs, Path path, Configuration conf) 
   throws IOException {
+    setCompression(conf);
+
     // Create a SF.Writer instance.
     try {
     	this.generateWriter(fs,path,conf);
@@ -78,8 +85,7 @@ public class SequenceFileLogWriter imple
           fs.getDefaultReplication()),
         conf.getLong("hbase.regionserver.hlog.blocksize",
           fs.getDefaultBlockSize()),
-        SequenceFile.CompressionType.NONE,
-        new DefaultCodec(),
+        this.compressionType, this.codec,
         null,
         new Metadata());
     } else {
@@ -163,12 +169,35 @@ public class SequenceFileLogWriter imple
     return this.dfsClient_out;
   }
   
-  // To be backward compatible; we still need to call the old sequence file 
-  // interface. 
-  private void generateWriter(FileSystem fs, Path path, Configuration conf) 
+  private void setCompression(Configuration conf) {
+    // compress each log record? (useful for non-trivial transaction systems)
+    String compressConf = conf.get("hbase.regionserver.hlog.compression");
+    if (compressConf != null) {
+      try {
+        CompressionCodec codec = Compression.getCompressionAlgorithmByName(
+            compressConf).getCodec(conf);
+        ((Configurable) codec).getConf().setInt("io.file.buffer.size",
+            32 * 1024);
+        this.compressionType = SequenceFile.CompressionType.RECORD;
+        this.codec = codec;
+        return; /* CORRECTLY APPLIED: EXITING HERE */
+      } catch (IllegalArgumentException iae) {
+        LOG.warn("Not compressing LogWriter", iae);
+      }
+    }
+
+    // default to fallback
+    this.compressionType = SequenceFile.CompressionType.NONE;
+    this.codec = new DefaultCodec();
+  }
+
+  // To be backward compatible; we still need to call the old sequence file
+  // interface.
+  private void generateWriter(FileSystem fs, Path path, Configuration conf)
   throws InvocationTargetException, Exception {
-  	boolean forceSync = 
-  		conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+       boolean forceSync =
+               conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+
   	if (forceSync) {
       // call the new create api with force sync flag
       this.writer = (SequenceFile.Writer) SequenceFile.class
@@ -186,7 +215,7 @@ public class SequenceFileLogWriter imple
             new Long(conf.getLong("hbase.regionserver.hlog.blocksize",
                 fs.getDefaultBlockSize())),
             new Boolean(false) /*createParent*/,
-            SequenceFile.CompressionType.NONE, new DefaultCodec(),
+            this.compressionType, this.codec,
             new Metadata(),
             forceSync
             });
@@ -209,7 +238,7 @@ public class SequenceFileLogWriter imple
             new Long(conf.getLong("hbase.regionserver.hlog.blocksize",
                 fs.getDefaultBlockSize())),
             new Boolean(false) /*createParent*/,
-            SequenceFile.CompressionType.NONE, new DefaultCodec(),
+            this.compressionType, this.codec,
             new Metadata()
             });
   	}