You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/28 21:24:54 UTC
svn commit: r1378276 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/
test/java/org/apache/hadoop/hbase/util/
Author: mbautin
Date: Tue Aug 28 19:24:54 2012
New Revision: 1378276
URL: http://svn.apache.org/viewvc?rev=1378276&view=rev
Log:
[HBASE-5355] RPC compression HTable interface
Author: aurickq
Summary:
1. ability to change RPC compression on the fly
2. put an option to enable RPC tx/rx compression in loadtest tool
3. bugfix: call returnCompressor/returnDecompressor to return codecs to CodecPool
Test Plan: ran loadtest
Reviewers: kannan, kranganathan, aaiyer, nspiegelberg
Reviewed By: kannan
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D540377
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 28 19:24:54 2012
@@ -169,7 +169,9 @@ public class HTable implements HTableInt
this.options = new HBaseRPCOptions ();
String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
if (compressionAlgo != null) {
- this.options.setRPCCompression(
+ this.options.setTxCompression(
+ Compression.getCompressionAlgorithmByName(compressionAlgo));
+ this.options.setRxCompression(
Compression.getCompressionAlgorithmByName(compressionAlgo));
}
}
@@ -1296,4 +1298,28 @@ public class HTable implements HTableInt
public String getTag () {
return this.options.getTag ();
}
+
+ /**
+ * set compression used to send RPC calls to the server
+ * @param alg compression algorithm
+ */
+ public void setTxCompression(Compression.Algorithm alg) {
+ this.options.setTxCompression(alg);
+ }
+
+ public Compression.Algorithm getTxCompression() {
+ return this.options.getTxCompression();
+ }
+
+ /**
+ * set compression used to receive RPC responses from the server
+ * @param alg compression algorithm
+ */
+ public void setRxCompression(Compression.Algorithm alg) {
+ this.options.setRxCompression(alg);
+ }
+
+ public Compression.Algorithm getRxCompression() {
+ return this.options.getRxCompression();
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Aug 28 19:24:54 2012
@@ -488,6 +488,7 @@ public class HBaseClient {
}
DataOutputStream uncompressedOS = null;
DataOutputStream outOS = null;
+ Compressor compressor = null;
try {
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
@@ -500,22 +501,19 @@ public class HBaseClient {
try {
// 1. write the call id uncompressed
uncompressedOS.writeInt(call.id);
-
// 2. write RPC options uncompressed
if (call.version >= HBaseServer.VERSION_RPCOPTIONS) {
call.options.write(outOS);
}
-
// preserve backwards compatibility
- if (call.options.getRPCCompression() != Compression.Algorithm.NONE) {
+ if (call.options.getTxCompression() != Compression.Algorithm.NONE) {
// 3. setup the compressor
- Compressor compressor = call.options.getRPCCompression().getCompressor();
+ compressor = call.options.getTxCompression().getCompressor();
OutputStream compressedOutputStream =
- call.options.getRPCCompression().createCompressionStream(
+ call.options.getTxCompression().createCompressionStream(
uncompressedOS, compressor, 0);
outOS = new DataOutputStream(compressedOutputStream);
}
-
// 4. write the output params with the correct compression type
call.param.write(outOS);
outOS.flush();
@@ -549,6 +547,9 @@ public class HBaseClient {
IOUtils.closeStream(outOS);
}
IOUtils.closeStream(uncompressedOS);
+ if (compressor != null) {
+ call.options.getTxCompression().returnCompressor(compressor);
+ }
}
}
@@ -560,6 +561,8 @@ public class HBaseClient {
return;
}
touch();
+ Compression.Algorithm rpcCompression = null;
+ Decompressor decompressor = null;
try {
DataInputStream localIn = in;
@@ -571,16 +574,16 @@ public class HBaseClient {
long totalTime = System.currentTimeMillis() - call.startTime;
// 2. read the error boolean uncompressed
boolean isError = localIn.readBoolean();
-
+
if (call.getVersion() >= HBaseServer.VERSION_RPCOPTIONS) {
// 3. read the compression type used for the rest of the response
String compressionAlgoName = localIn.readUTF();
- Compression.Algorithm rpcCompression =
+ rpcCompression =
Compression.getCompressionAlgorithmByName(compressionAlgoName);
// 4. setup the correct decompressor (if any)
if (rpcCompression != Compression.Algorithm.NONE) {
- Decompressor decompressor = rpcCompression.getDecompressor();
+ decompressor = rpcCompression.getDecompressor();
InputStream is = rpcCompression.createDecompressionStream(
in, decompressor, 0);
localIn = new DataInputStream(is);
@@ -622,6 +625,10 @@ public class HBaseClient {
markClosed(e);
} catch (Throwable te) {
markClosed((IOException)new IOException().initCause(te));
+ } finally {
+ if (decompressor != null) {
+ rpcCompression.returnDecompressor(decompressor);
+ }
}
}
@@ -934,7 +941,8 @@ public class HBaseClient {
}
// RPC compression is only supported from version 4, so make backward compatible
byte version = HBaseServer.CURRENT_VERSION;
- if (call.options.getRPCCompression() == Compression.Algorithm.NONE
+ if (call.options.getTxCompression() == Compression.Algorithm.NONE
+ && call.options.getRxCompression() == Compression.Algorithm.NONE
&& !call.options.getRequestProfiling ()
&& call.options.getTag () == null) {
version = HBaseServer.VERSION_3;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java Tue Aug 28 19:24:54 2012
@@ -15,7 +15,8 @@ public class HBaseRPCOptions implements
private static final byte VERSION_INITIAL = 1;
private byte version = VERSION_INITIAL;
- private Compression.Algorithm compressionAlgo = Compression.Algorithm.NONE;
+ private Compression.Algorithm rxCompression = Compression.Algorithm.NONE;
+ private Compression.Algorithm txCompression = Compression.Algorithm.NONE;
private boolean requestProfiling = false;
private String tag = null;
@@ -33,12 +34,20 @@ public class HBaseRPCOptions implements
return this.version;
}
- public void setRPCCompression(Compression.Algorithm compressionAlgo) {
- this.compressionAlgo = compressionAlgo;
+ public void setRxCompression(Compression.Algorithm compressionAlgo) {
+ this.rxCompression = compressionAlgo;
}
- public Compression.Algorithm getRPCCompression() {
- return this.compressionAlgo;
+ public Compression.Algorithm getRxCompression() {
+ return this.rxCompression;
+ }
+
+ public void setTxCompression(Compression.Algorithm compressionAlgo) {
+ this.txCompression = compressionAlgo;
+ }
+
+ public Compression.Algorithm getTxCompression() {
+ return this.txCompression;
}
/**
@@ -75,10 +84,10 @@ public class HBaseRPCOptions implements
out.writeByte(this.version);
// 2. write the compression algo used to compress the request being sent
- out.writeUTF(this.compressionAlgo.getName());
+ out.writeUTF(this.txCompression.getName());
// 3. write the compression algo to use for the response
- out.writeUTF(this.compressionAlgo.getName());
+ out.writeUTF(this.rxCompression.getName());
// 4. write profiling request flag
out.writeBoolean(this.requestProfiling);
@@ -98,11 +107,10 @@ public class HBaseRPCOptions implements
throw new VersionMismatch("HBaseRPCOptions", this.version,
VERSION_INITIAL);
}
- String compressionName;
- compressionName = in.readUTF ();
- compressionName = in.readUTF (); // dummy read
- this.compressionAlgo = Compression.
- getCompressionAlgorithmByName(compressionName);
+ this.txCompression = Compression.
+ getCompressionAlgorithmByName(in.readUTF());
+ this.rxCompression = Compression.
+ getCompressionAlgorithmByName(in.readUTF());
this.requestProfiling = in.readBoolean();
this.tag = null;
if (in.readBoolean()) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Aug 28 19:24:54 2012
@@ -997,8 +997,8 @@ public abstract class HBaseServer {
private void processData() throws IOException, InterruptedException {
DataInputStream uncompressedIs =
new DataInputStream(new ByteArrayInputStream(data.array()));
- Compression.Algorithm rxCompression = Algorithm.NONE;
Compression.Algorithm txCompression = Algorithm.NONE;
+ Compression.Algorithm rxCompression = Algorithm.NONE;
DataInputStream dis = uncompressedIs;
// 1. read the call id uncompressed
@@ -1007,15 +1007,16 @@ public abstract class HBaseServer {
LOG.trace(" got #" + id);
HBaseRPCOptions options = new HBaseRPCOptions ();
+ Decompressor decompressor = null;
if (version >= VERSION_RPCOPTIONS) {
// 2. read rpc options uncompressed
options.readFields(dis);
- rxCompression = options.getRPCCompression();
- txCompression = options.getRPCCompression();
+ txCompression = options.getTxCompression(); // server receives this
+ rxCompression = options.getRxCompression(); // server responds with
// 3. set up a decompressor to read the rest of the request
- if (rxCompression != Compression.Algorithm.NONE) {
- Decompressor decompressor = rxCompression.getDecompressor();
- InputStream is = rxCompression.createDecompressionStream(
+ if (txCompression != Compression.Algorithm.NONE) {
+ decompressor = txCompression.getDecompressor();
+ InputStream is = txCompression.createDecompressionStream(
uncompressedIs, decompressor, 0);
dis = new DataInputStream(is);
}
@@ -1027,10 +1028,14 @@ public abstract class HBaseServer {
Call call = new Call(id, param, this);
call.shouldProfile = options.getRequestProfiling ();
- call.setRPCCompression(txCompression);
+ call.setRPCCompression(rxCompression);
call.setVersion(version);
call.setTag(options.getTag());
callQueue.put(call); // queue the call; maybe blocked here
+
+ if (decompressor != null) {
+ txCompression.returnDecompressor(decompressor);
+ }
}
protected synchronized void close() {
@@ -1127,6 +1132,7 @@ public abstract class HBaseServer {
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream rawOS = new DataOutputStream(buf);
DataOutputStream out = rawOS;
+ Compressor compressor = null;
// 1. write call id uncompressed
out.writeInt(call.id);
@@ -1140,7 +1146,7 @@ public abstract class HBaseServer {
// 4. create a compressed output stream if compression was enabled
if (call.getRPCCompression() != Compression.Algorithm.NONE) {
- Compressor compressor = call.getRPCCompression().getCompressor();
+ compressor = call.getRPCCompression().getCompressor();
OutputStream compressedOutputStream =
call.getRPCCompression().createCompressionStream(rawOS, compressor, 0);
out = new DataOutputStream(compressedOutputStream);
@@ -1168,6 +1174,9 @@ public abstract class HBaseServer {
buf.flush();
call.setResponse(buf.getByteBuffer());
responder.doRespond(call);
+ if (compressor != null) {
+ call.getRPCCompression().returnCompressor(compressor);
+ }
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.warn(getName() + " caught: " +
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Tue Aug 28 19:24:54 2012
@@ -96,6 +96,7 @@ public class LoadTestTool extends Abstra
private static final String OPT_TABLE_NAME = "tn";
private static final String OPT_ZK_QUORUM = "zk";
private static final String OPT_PROFILING = "profiling";
+ private static final String OPT_RPC_COMPRESSION = "rpc_compression";
private static final long DEFAULT_START_KEY = 0;
@@ -114,7 +115,11 @@ public class LoadTestTool extends Abstra
private boolean encodeInCacheOnly;
private Compression.Algorithm compressAlgo;
private StoreFile.BloomType bloomType;
-
+
+ // RPC options
+ private Compression.Algorithm txCompression = Compression.Algorithm.NONE;
+ private Compression.Algorithm rxCompression = Compression.Algorithm.NONE;
+
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
private long minColsPerKey, maxColsPerKey;
@@ -200,6 +205,8 @@ public class LoadTestTool extends Abstra
DEFAULT_START_KEY + ".");
addOptWithArg(OPT_PROFILING, "Percent of reads/writes to request " +
"profiling data");
+ addOptWithArg(OPT_RPC_COMPRESSION, "RPC compression to use " +
+ "<tx_compression>:<rx_compression>");
}
@Override
@@ -278,6 +285,17 @@ public class LoadTestTool extends Abstra
System.out.println ("Requesting profiling data on " + profilePercent +
"% of reads/writes");
}
+
+ if (cmd.hasOption(OPT_RPC_COMPRESSION)) {
+ String [] comp = this.splitColonSeparated(OPT_RPC_COMPRESSION, 2, 2);
+ this.txCompression = Compression.Algorithm.
+ valueOf(comp[0]);
+ this.rxCompression = Compression.Algorithm.
+ valueOf(comp[1]);
+
+ System.out.println ("txCompression: " + comp[0]);
+ System.out.println ("rxCompression: " + comp[1]);
+ }
System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
}
@@ -312,7 +330,7 @@ public class LoadTestTool extends Abstra
if (isWrite) {
writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY,
- profilePercent);
+ profilePercent, this.txCompression, this.rxCompression);
writerThreads.setMultiPut(isMultiPut);
writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writerThreads.setDataSize(minColDataSize, maxColDataSize);
@@ -320,7 +338,7 @@ public class LoadTestTool extends Abstra
if (isRead) {
readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
- verifyPercent, profilePercent);
+ verifyPercent, profilePercent, this.txCompression, this.rxCompression);
readerThreads.setMaxErrors(maxReadErrors);
readerThreads.setKeyWindow(keyWindow);
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Tue Aug 28 19:24:54 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.hfile.Compression;
/** Creates multiple threads that read and verify previously written data */
public class MultiThreadedReader extends MultiThreadedAction
@@ -71,17 +72,26 @@ public class MultiThreadedReader extends
private int maxErrors = DEFAULT_MAX_ERRORS;
private int keyWindow = DEFAULT_KEY_WINDOW;
+
+ /** RPC compression */
+ private Compression.Algorithm txCompression;
+ private Compression.Algorithm rxCompression;
public MultiThreadedReader(Configuration conf, byte[] tableName,
byte[] columnFamily, double verifyPercent) {
- this (conf, tableName, columnFamily, verifyPercent, 0);
+ this (conf, tableName, columnFamily, verifyPercent, 0,
+ Compression.Algorithm.NONE, Compression.Algorithm.NONE);
}
public MultiThreadedReader(Configuration conf, byte[] tableName,
- byte[] columnFamily, double verifyPercent, double profilePercent) {
+ byte[] columnFamily, double verifyPercent, double profilePercent,
+ Compression.Algorithm txCompression,
+ Compression.Algorithm rxCompression) {
super(conf, tableName, columnFamily, "R");
this.verifyPercent = verifyPercent;
this.profilePercent = profilePercent;
+ this.txCompression = txCompression;
+ this.rxCompression = rxCompression;
}
public void linkToWriter(MultiThreadedWriter writer) {
@@ -133,6 +143,8 @@ public class MultiThreadedReader extends
public HBaseReaderThread(int readerId) throws IOException {
this.readerId = readerId;
table = new HTable(conf, tableName);
+ table.setTxCompression(txCompression);
+ table.setRxCompression(rxCompression);
setName(getClass().getSimpleName() + "_" + readerId);
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Tue Aug 28 19:24:54 2012
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.Compression;
/** Creates multiple threads that write key/values into the */
public class MultiThreadedWriter extends MultiThreadedAction {
@@ -79,15 +80,24 @@ public class MultiThreadedWriter extends
/** Enable this if used in conjunction with a concurrent reader. */
private boolean trackInsertedKeys;
+ /** RPC compression */
+ private Compression.Algorithm txCompression;
+ private Compression.Algorithm rxCompression;
+
public MultiThreadedWriter(Configuration conf, byte[] tableName,
byte[] columnFamily) {
- this (conf, tableName, columnFamily, 0);
+ this (conf, tableName, columnFamily, 0,
+ Compression.Algorithm.NONE, Compression.Algorithm.NONE);
}
public MultiThreadedWriter(Configuration conf, byte[] tableName,
- byte[] columnFamily, double profilePercent) {
+ byte[] columnFamily, double profilePercent,
+ Compression.Algorithm txCompression,
+ Compression.Algorithm rxCompression) {
super(conf, tableName, columnFamily, "W");
this.profilePercent = profilePercent;
+ this.txCompression = txCompression;
+ this.rxCompression = rxCompression;
}
/** Use multi-puts vs. separate puts for every column in a row */
@@ -139,6 +149,8 @@ public class MultiThreadedWriter extends
public HBaseWriterThread(int writerId) throws IOException {
setName(getClass().getSimpleName() + "_" + writerId);
table = new HTable(conf, tableName);
+ table.setTxCompression(txCompression);
+ table.setRxCompression(rxCompression);
}
public void run() {