You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/05 20:18:20 UTC
svn commit: r1465082 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
io/hfile/Compression.java regionserver/wal/SequenceFileLogWriter.java
Author: liyin
Date: Fri Apr 5 18:18:19 2013
New Revision: 1465082
URL: http://svn.apache.org/r1465082
Log:
[HBASE-8155] Allow Record Compression for HLogs
Author: nspiegelberg
Summary:
Hadoop already allows for native compression support with
SequenceFile. We should allow the user to enable this functionality
through the config. Besides backwards compatibility & existing
stabilized code, allowing users to enable record-level WAL support will
allow support value compression.
Preliminary results show 3x compression on LZO with minimal throughput
savings. This got more attention recently because it subsequently
facilitates longer log retention for backup & replication purposes.
Test Plan: - mvn test -Dtest=HLog
Reviewers: aaiyer, liyintang, pritam, amirshim
Reviewed By: aaiyer
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D745149
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1465082&r1=1465081&r2=1465082&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Fri Apr 5 18:18:19 2013
@@ -88,7 +88,7 @@ public final class Compression {
lzoCodec = null;
}
@Override
- CompressionCodec getCodec(Configuration conf) {
+ public CompressionCodec getCodec(Configuration conf) {
if (lzoCodec == null) {
try {
Class<?> externalCodec = ClassLoader.getSystemClassLoader().loadClass(
@@ -106,7 +106,7 @@ public final class Compression {
private transient Object lock = new Object();
@Override
- DefaultCodec getCodec(Configuration conf) {
+ public DefaultCodec getCodec(Configuration conf) {
if (codec == null) {
synchronized (lock) {
if (codec == null) {
@@ -125,7 +125,7 @@ public final class Compression {
},
NONE("none") {
@Override
- DefaultCodec getCodec(Configuration conf) {
+ public DefaultCodec getCodec(Configuration conf) {
return null;
}
@Override
@@ -169,7 +169,7 @@ public final class Compression {
}
@SuppressWarnings("unchecked")
@Override
- CompressionCodec getCodec(Configuration conf) {
+ public CompressionCodec getCodec(Configuration conf) {
if (snappyCodec == null) {
synchronized (lock) {
if (snappyCodec == null) {
@@ -211,7 +211,7 @@ public final class Compression {
this.compressName = name;
}
- abstract CompressionCodec getCodec(Configuration conf);
+ public abstract CompressionCodec getCodec(Configuration conf);
public InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1465082&r1=1465081&r2=1465082&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Fri Apr 5 18:18:19 2013
@@ -28,10 +28,12 @@ import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -50,6 +52,9 @@ public class SequenceFileLogWriter imple
private OutputStream dfsClient_out;
// The syncFs method from hdfs-200 or null if not available.
private Method syncFs;
+ // The type of compression to apply to each transaction
+ private CompressionType compressionType;
+ private CompressionCodec codec;
public SequenceFileLogWriter() {
super();
@@ -58,6 +63,8 @@ public class SequenceFileLogWriter imple
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
+ setCompression(conf);
+
// Create a SF.Writer instance.
try {
this.generateWriter(fs,path,conf);
@@ -78,8 +85,7 @@ public class SequenceFileLogWriter imple
fs.getDefaultReplication()),
conf.getLong("hbase.regionserver.hlog.blocksize",
fs.getDefaultBlockSize()),
- SequenceFile.CompressionType.NONE,
- new DefaultCodec(),
+ this.compressionType, this.codec,
null,
new Metadata());
} else {
@@ -163,12 +169,35 @@ public class SequenceFileLogWriter imple
return this.dfsClient_out;
}
- // To be backward compatible; we still need to call the old sequence file
- // interface.
- private void generateWriter(FileSystem fs, Path path, Configuration conf)
+ private void setCompression(Configuration conf) {
+ // compress each log record? (useful for non-trivial transaction systems)
+ String compressConf = conf.get("hbase.regionserver.hlog.compression");
+ if (compressConf != null) {
+ try {
+ CompressionCodec codec = Compression.getCompressionAlgorithmByName(
+ compressConf).getCodec(conf);
+ ((Configurable) codec).getConf().setInt("io.file.buffer.size",
+ 32 * 1024);
+ this.compressionType = SequenceFile.CompressionType.RECORD;
+ this.codec = codec;
+ return; /* CORRECTLY APPLIED: EXITING HERE */
+ } catch (IllegalArgumentException iae) {
+ LOG.warn("Not compressing LogWriter", iae);
+ }
+ }
+
+ // default to fallback
+ this.compressionType = SequenceFile.CompressionType.NONE;
+ this.codec = new DefaultCodec();
+ }
+
+ // To be backward compatible; we still need to call the old sequence file
+ // interface.
+ private void generateWriter(FileSystem fs, Path path, Configuration conf)
throws InvocationTargetException, Exception {
- boolean forceSync =
- conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+ boolean forceSync =
+ conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+
if (forceSync) {
// call the new create api with force sync flag
this.writer = (SequenceFile.Writer) SequenceFile.class
@@ -186,7 +215,7 @@ public class SequenceFileLogWriter imple
new Long(conf.getLong("hbase.regionserver.hlog.blocksize",
fs.getDefaultBlockSize())),
new Boolean(false) /*createParent*/,
- SequenceFile.CompressionType.NONE, new DefaultCodec(),
+ this.compressionType, this.codec,
new Metadata(),
forceSync
});
@@ -209,7 +238,7 @@ public class SequenceFileLogWriter imple
new Long(conf.getLong("hbase.regionserver.hlog.blocksize",
fs.getDefaultBlockSize())),
new Boolean(false) /*createParent*/,
- SequenceFile.CompressionType.NONE, new DefaultCodec(),
+ this.compressionType, this.codec,
new Metadata()
});
}