You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/28 21:24:54 UTC

svn commit: r1378276 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ test/java/org/apache/hadoop/hbase/util/

Author: mbautin
Date: Tue Aug 28 19:24:54 2012
New Revision: 1378276

URL: http://svn.apache.org/viewvc?rev=1378276&view=rev
Log:
[HBASE-5355] RPC compression HTable interface

Author: aurickq

Summary:
1. ability to change RPC compression on the fly
2. put an option to enable RPC tx/rx compression in loadtest tool
3. bugfix: call returnCompressor/returnDecompressor to return codecs to CodecPool

Test Plan: ran loadtest

Reviewers: kannan, kranganathan, aaiyer, nspiegelberg

Reviewed By: kannan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D540377

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 28 19:24:54 2012
@@ -169,7 +169,9 @@ public class HTable implements HTableInt
     this.options = new HBaseRPCOptions ();
     String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
     if (compressionAlgo != null) {
-      this.options.setRPCCompression(
+      this.options.setTxCompression(
+          Compression.getCompressionAlgorithmByName(compressionAlgo));
+      this.options.setRxCompression(
           Compression.getCompressionAlgorithmByName(compressionAlgo));
     }
   }
@@ -1296,4 +1298,28 @@ public class HTable implements HTableInt
   public String getTag () {
     return this.options.getTag ();
   }
+  
+  /**
+   * set compression used to send RPC calls to the server
+   * @param alg compression algorithm
+   */
+  public void setTxCompression(Compression.Algorithm alg) {
+    this.options.setTxCompression(alg);
+  }
+  
+  public Compression.Algorithm getTxCompression() {
+    return this.options.getTxCompression();
+  }
+  
+  /**
+   * set compression used to receive RPC responses from the server
+   * @param alg compression algorithm
+   */
+  public void setRxCompression(Compression.Algorithm alg) {
+    this.options.setRxCompression(alg);
+  }
+  
+  public Compression.Algorithm getRxCompression() {
+    return this.options.getRxCompression();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Aug 28 19:24:54 2012
@@ -488,6 +488,7 @@ public class HBaseClient {
       }
       DataOutputStream uncompressedOS = null;
       DataOutputStream outOS = null;
+      Compressor compressor = null;
       try {
         //noinspection SynchronizeOnNonFinalField
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
@@ -500,22 +501,19 @@ public class HBaseClient {
           try {
             // 1. write the call id uncompressed
             uncompressedOS.writeInt(call.id);
-         
             // 2. write RPC options uncompressed
             if (call.version >= HBaseServer.VERSION_RPCOPTIONS) {
               call.options.write(outOS);
             }
-
             // preserve backwards compatibility
-            if (call.options.getRPCCompression() != Compression.Algorithm.NONE) {
+            if (call.options.getTxCompression() != Compression.Algorithm.NONE) {
               // 3. setup the compressor
-              Compressor compressor = call.options.getRPCCompression().getCompressor();
+              compressor = call.options.getTxCompression().getCompressor();
               OutputStream compressedOutputStream =
-                call.options.getRPCCompression().createCompressionStream(
+                call.options.getTxCompression().createCompressionStream(
                   uncompressedOS, compressor, 0);
               outOS = new DataOutputStream(compressedOutputStream);
             }
-
             // 4. write the output params with the correct compression type
             call.param.write(outOS);
             outOS.flush();
@@ -549,6 +547,9 @@ public class HBaseClient {
           IOUtils.closeStream(outOS);
         }
         IOUtils.closeStream(uncompressedOS);
+        if (compressor != null) {
+          call.options.getTxCompression().returnCompressor(compressor);
+        }
       }
     }
 
@@ -560,6 +561,8 @@ public class HBaseClient {
         return;
       }
       touch();
+      Compression.Algorithm rpcCompression = null;
+      Decompressor decompressor = null;
       try {
         DataInputStream localIn = in;
 
@@ -571,16 +574,16 @@ public class HBaseClient {
         long totalTime = System.currentTimeMillis() - call.startTime;
         // 2. read the error boolean uncompressed
         boolean isError = localIn.readBoolean();
-
+        
         if (call.getVersion() >= HBaseServer.VERSION_RPCOPTIONS) {
           // 3. read the compression type used for the rest of the response
           String compressionAlgoName = localIn.readUTF();
-          Compression.Algorithm rpcCompression =
+          rpcCompression =  
             Compression.getCompressionAlgorithmByName(compressionAlgoName);
 
           // 4. setup the correct decompressor (if any)
           if (rpcCompression != Compression.Algorithm.NONE) {
-            Decompressor decompressor = rpcCompression.getDecompressor();
+            decompressor = rpcCompression.getDecompressor();
             InputStream is = rpcCompression.createDecompressionStream(
                   in, decompressor, 0);
             localIn = new DataInputStream(is);
@@ -622,6 +625,10 @@ public class HBaseClient {
         markClosed(e);
       } catch (Throwable te) {
         markClosed((IOException)new IOException().initCause(te));
+      } finally {
+        if (decompressor != null) {
+          rpcCompression.returnDecompressor(decompressor);
+        }
       }
     }
 
@@ -934,7 +941,8 @@ public class HBaseClient {
     }
     // RPC compression is only supported from version 4, so make backward compatible
     byte version = HBaseServer.CURRENT_VERSION;
-    if (call.options.getRPCCompression() == Compression.Algorithm.NONE
+    if (call.options.getTxCompression() == Compression.Algorithm.NONE
+        && call.options.getRxCompression() == Compression.Algorithm.NONE
         && !call.options.getRequestProfiling ()
         && call.options.getTag () == null) {
       version = HBaseServer.VERSION_3;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java Tue Aug 28 19:24:54 2012
@@ -15,7 +15,8 @@ public class HBaseRPCOptions implements 
   private static final byte VERSION_INITIAL = 1;
 	
   private byte version = VERSION_INITIAL;
-  private Compression.Algorithm compressionAlgo = Compression.Algorithm.NONE;
+  private Compression.Algorithm rxCompression = Compression.Algorithm.NONE;
+  private Compression.Algorithm txCompression = Compression.Algorithm.NONE;
 	private boolean requestProfiling = false;
 	private String tag = null;
 	
@@ -33,12 +34,20 @@ public class HBaseRPCOptions implements 
     return this.version;
   }
 	
-	public void setRPCCompression(Compression.Algorithm compressionAlgo) {
-    this.compressionAlgo = compressionAlgo;
+	public void setRxCompression(Compression.Algorithm compressionAlgo) {
+    this.rxCompression = compressionAlgo;
   }
 
-  public Compression.Algorithm getRPCCompression() {
-    return this.compressionAlgo;
+  public Compression.Algorithm getRxCompression() {
+    return this.rxCompression;
+  }
+  
+  public void setTxCompression(Compression.Algorithm compressionAlgo) {
+    this.txCompression = compressionAlgo;
+  }
+
+  public Compression.Algorithm getTxCompression() {
+    return this.txCompression;
   }
 	
   /**
@@ -75,10 +84,10 @@ public class HBaseRPCOptions implements 
 	  out.writeByte(this.version);
 	  
 	  // 2. write the compression algo used to compress the request being sent
-    out.writeUTF(this.compressionAlgo.getName());
+    out.writeUTF(this.txCompression.getName());
     
     // 3. write the compression algo to use for the response
-    out.writeUTF(this.compressionAlgo.getName());
+    out.writeUTF(this.rxCompression.getName());
     
     // 4. write profiling request flag
 	  out.writeBoolean(this.requestProfiling);
@@ -98,11 +107,10 @@ public class HBaseRPCOptions implements 
       throw new VersionMismatch("HBaseRPCOptions", this.version,
           VERSION_INITIAL);
     }
-    String compressionName;
-    compressionName = in.readUTF ();
-    compressionName = in.readUTF ();          // dummy read
-    this.compressionAlgo = Compression.
-        getCompressionAlgorithmByName(compressionName);
+    this.txCompression = Compression.
+        getCompressionAlgorithmByName(in.readUTF());
+    this.rxCompression = Compression.
+        getCompressionAlgorithmByName(in.readUTF());
     this.requestProfiling = in.readBoolean();
     this.tag = null;
     if (in.readBoolean()) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Aug 28 19:24:54 2012
@@ -997,8 +997,8 @@ public abstract class HBaseServer {
     private void processData() throws  IOException, InterruptedException {
       DataInputStream uncompressedIs =
         new DataInputStream(new ByteArrayInputStream(data.array()));
-      Compression.Algorithm rxCompression = Algorithm.NONE;
       Compression.Algorithm txCompression = Algorithm.NONE;
+      Compression.Algorithm rxCompression = Algorithm.NONE;
       DataInputStream dis = uncompressedIs;
 
       // 1. read the call id uncompressed
@@ -1007,15 +1007,16 @@ public abstract class HBaseServer {
         LOG.trace(" got #" + id);
       
       HBaseRPCOptions options = new HBaseRPCOptions ();
+      Decompressor decompressor = null;
       if (version >= VERSION_RPCOPTIONS) {
         // 2. read rpc options uncompressed
         options.readFields(dis);
-        rxCompression = options.getRPCCompression();
-        txCompression = options.getRPCCompression();
+        txCompression = options.getTxCompression();   // server receives this
+        rxCompression = options.getRxCompression();   // server responds with
         // 3. set up a decompressor to read the rest of the request
-        if (rxCompression != Compression.Algorithm.NONE) {
-          Decompressor decompressor = rxCompression.getDecompressor();
-          InputStream is = rxCompression.createDecompressionStream(
+        if (txCompression != Compression.Algorithm.NONE) {
+          decompressor = txCompression.getDecompressor();
+          InputStream is = txCompression.createDecompressionStream(
               uncompressedIs, decompressor, 0);
           dis = new DataInputStream(is);
         }
@@ -1027,10 +1028,14 @@ public abstract class HBaseServer {
       Call call = new Call(id, param, this);
       call.shouldProfile = options.getRequestProfiling ();
       
-      call.setRPCCompression(txCompression);
+      call.setRPCCompression(rxCompression);
       call.setVersion(version);
       call.setTag(options.getTag());
       callQueue.put(call);              // queue the call; maybe blocked here
+      
+      if (decompressor != null) {
+        txCompression.returnDecompressor(decompressor);
+      }
     }
 
     protected synchronized void close() {
@@ -1127,6 +1132,7 @@ public abstract class HBaseServer {
           ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
           DataOutputStream rawOS = new DataOutputStream(buf);
           DataOutputStream out = rawOS;
+          Compressor compressor = null;
 
           // 1. write call id uncompressed
           out.writeInt(call.id);
@@ -1140,7 +1146,7 @@ public abstract class HBaseServer {
 
             // 4. create a compressed output stream if compression was enabled
             if (call.getRPCCompression() != Compression.Algorithm.NONE) {
-              Compressor compressor = call.getRPCCompression().getCompressor();
+              compressor = call.getRPCCompression().getCompressor();
               OutputStream compressedOutputStream =
                 call.getRPCCompression().createCompressionStream(rawOS, compressor, 0);
               out = new DataOutputStream(compressedOutputStream);
@@ -1168,6 +1174,9 @@ public abstract class HBaseServer {
           buf.flush();
           call.setResponse(buf.getByteBuffer());
           responder.doRespond(call);
+          if (compressor != null) {
+            call.getRPCCompression().returnCompressor(compressor);
+          }
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
             LOG.warn(getName() + " caught: " +

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Tue Aug 28 19:24:54 2012
@@ -96,6 +96,7 @@ public class LoadTestTool extends Abstra
   private static final String OPT_TABLE_NAME = "tn";
   private static final String OPT_ZK_QUORUM = "zk";
   private static final String OPT_PROFILING = "profiling";
+  private static final String OPT_RPC_COMPRESSION = "rpc_compression";
 
   private static final long DEFAULT_START_KEY = 0;
 
@@ -114,7 +115,11 @@ public class LoadTestTool extends Abstra
   private boolean encodeInCacheOnly;
   private Compression.Algorithm compressAlgo;
   private StoreFile.BloomType bloomType;
-
+  
+  // RPC options
+  private Compression.Algorithm txCompression = Compression.Algorithm.NONE;
+  private Compression.Algorithm rxCompression = Compression.Algorithm.NONE;
+  
   // Writer options
   private int numWriterThreads = DEFAULT_NUM_THREADS;
   private long minColsPerKey, maxColsPerKey;
@@ -200,6 +205,8 @@ public class LoadTestTool extends Abstra
         DEFAULT_START_KEY + ".");
     addOptWithArg(OPT_PROFILING, "Percent of reads/writes to request " +
         "profiling data");
+    addOptWithArg(OPT_RPC_COMPRESSION, "RPC compression to use " +
+        "<tx_compression>:<rx_compression>");
   }
 
   @Override
@@ -278,6 +285,17 @@ public class LoadTestTool extends Abstra
       System.out.println ("Requesting profiling data on " + profilePercent +
           "% of reads/writes");
     }
+    
+    if (cmd.hasOption(OPT_RPC_COMPRESSION)) {
+      String [] comp = this.splitColonSeparated(OPT_RPC_COMPRESSION, 2, 2);
+      this.txCompression = Compression.Algorithm.
+          valueOf(comp[0]);
+      this.rxCompression = Compression.Algorithm.
+          valueOf(comp[1]);
+      
+      System.out.println ("txCompression: " + comp[0]);
+      System.out.println ("rxCompression: " + comp[1]);
+    }
 
     System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
   }
@@ -312,7 +330,7 @@ public class LoadTestTool extends Abstra
 
     if (isWrite) {
       writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY, 
-          profilePercent);
+          profilePercent, this.txCompression, this.rxCompression);
       writerThreads.setMultiPut(isMultiPut);
       writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
       writerThreads.setDataSize(minColDataSize, maxColDataSize);
@@ -320,7 +338,7 @@ public class LoadTestTool extends Abstra
 
     if (isRead) {
       readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
-          verifyPercent, profilePercent);
+          verifyPercent, profilePercent, this.txCompression, this.rxCompression);
       readerThreads.setMaxErrors(maxReadErrors);
       readerThreads.setKeyWindow(keyWindow);
     }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Tue Aug 28 19:24:54 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 
 /** Creates multiple threads that read and verify previously written data */
 public class MultiThreadedReader extends MultiThreadedAction
@@ -71,17 +72,26 @@ public class MultiThreadedReader extends
 
   private int maxErrors = DEFAULT_MAX_ERRORS;
   private int keyWindow = DEFAULT_KEY_WINDOW;
+  
+  /** RPC compression */
+  private Compression.Algorithm txCompression;
+  private Compression.Algorithm rxCompression;
 
   public MultiThreadedReader(Configuration conf, byte[] tableName,
       byte[] columnFamily, double verifyPercent) {
-    this (conf, tableName, columnFamily, verifyPercent, 0);
+    this (conf, tableName, columnFamily, verifyPercent, 0,
+        Compression.Algorithm.NONE, Compression.Algorithm.NONE);
   }
   
   public MultiThreadedReader(Configuration conf, byte[] tableName,
-      byte[] columnFamily, double verifyPercent, double profilePercent) {
+      byte[] columnFamily, double verifyPercent, double profilePercent,
+      Compression.Algorithm txCompression, 
+      Compression.Algorithm rxCompression) {
     super(conf, tableName, columnFamily, "R");
     this.verifyPercent = verifyPercent;
     this.profilePercent = profilePercent;
+    this.txCompression = txCompression;
+    this.rxCompression = rxCompression;
   }
 
   public void linkToWriter(MultiThreadedWriter writer) {
@@ -133,6 +143,8 @@ public class MultiThreadedReader extends
     public HBaseReaderThread(int readerId) throws IOException {
       this.readerId = readerId;
       table = new HTable(conf, tableName);
+      table.setTxCompression(txCompression);
+      table.setRxCompression(rxCompression);
       setName(getClass().getSimpleName() + "_" + readerId);
     }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1378276&r1=1378275&r2=1378276&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Tue Aug 28 19:24:54 2012
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 
 /** Creates multiple threads that write key/values into the */
 public class MultiThreadedWriter extends MultiThreadedAction {
@@ -79,15 +80,24 @@ public class MultiThreadedWriter extends
   /** Enable this if used in conjunction with a concurrent reader. */
   private boolean trackInsertedKeys;
   
+  /** RPC compression */
+  private Compression.Algorithm txCompression;
+  private Compression.Algorithm rxCompression;
+  
   public MultiThreadedWriter(Configuration conf, byte[] tableName,
       byte[] columnFamily) {
-    this (conf, tableName, columnFamily, 0);
+    this (conf, tableName, columnFamily, 0,
+        Compression.Algorithm.NONE, Compression.Algorithm.NONE);
   }
   
   public MultiThreadedWriter(Configuration conf, byte[] tableName,
-      byte[] columnFamily, double profilePercent) {
+      byte[] columnFamily, double profilePercent,
+      Compression.Algorithm txCompression,
+      Compression.Algorithm rxCompression) {
     super(conf, tableName, columnFamily, "W");
     this.profilePercent = profilePercent;
+    this.txCompression = txCompression;
+    this.rxCompression = rxCompression;
   }
 
   /** Use multi-puts vs. separate puts for every column in a row */
@@ -139,6 +149,8 @@ public class MultiThreadedWriter extends
     public HBaseWriterThread(int writerId) throws IOException {
       setName(getClass().getSimpleName() + "_" + writerId);
       table = new HTable(conf, tableName);
+      table.setTxCompression(txCompression);
+      table.setRxCompression(rxCompression);
     }
 
     public void run() {