You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:52:21 UTC

[03/50] [abbrv] hbase git commit: HBASE-13230 [mob] reads hang when trying to read rows with large mobs (>10MB)

HBASE-13230 [mob] reads hang when trying to read rows with large mobs (>10MB)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aedd0ebe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aedd0ebe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aedd0ebe

Branch: refs/heads/master
Commit: aedd0ebe9b8d1968f7d91772b369829e00606087
Parents: 8c1edeb
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Mar 13 10:37:52 2015 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Mar 13 10:45:46 2015 -0700

----------------------------------------------------------------------
 .../hbase/ipc/AsyncServerResponseHandler.java   |  3 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  3 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 93 ++++++++++++++++++--
 .../hbase/regionserver/TestMobStoreScanner.java | 29 ++++--
 4 files changed, 112 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index d71bf5e..43028c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.ipc.RemoteException;
 
@@ -92,7 +93,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
         // Call may be null because it may have timedout and been cleaned up on this side already
         if (call.responseDefaultType != null) {
           Message.Builder builder = call.responseDefaultType.newBuilderForType();
-          builder.mergeDelimitedFrom(in);
+          ProtobufUtil.mergeDelimitedFrom(builder, in);
           value = builder.build();
         }
         CellScanner cellBlockScanner = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 9400a2c..dad5164 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -950,7 +951,7 @@ public class RpcClientImpl extends AbstractRpcClient {
           Message value = null;
           if (call.responseDefaultType != null) {
             Builder builder = call.responseDefaultType.newBuilderForType();
-            builder.mergeDelimitedFrom(in);
+            ProtobufUtil.mergeDelimitedFrom(builder, in);
             value = builder.build();
           }
           CellScanner cellBlockScanner = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index caae1bb..7bb9de1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.protobuf;
 import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
 
 import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -37,6 +39,7 @@ import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.concurrent.TimeUnit;
 
+import com.google.protobuf.*;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -149,14 +152,6 @@ import org.apache.hadoop.security.token.Token;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
 
 /**
  * Protobufs utility.
@@ -2994,4 +2989,86 @@ public final class ProtobufUtil {
 
     return desc.build();
   }
+
+
+
+  /**
+   * This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding
+   * buffers
+   * @param builder current message builder
+   * @param in Inputsream with delimited protobuf data
+   * @throws IOException
+   */
+  public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException {
+    // This used to be builder.mergeDelimitedFrom(in);
+    // but is replaced to allow us to bump the protobuf size limit.
+    final int firstByte = in.read();
+    if (firstByte == -1) {
+      // bail out. (was return false;)
+    } else {
+      final int size = CodedInputStream.readRawVarint32(firstByte, in);
+      final InputStream limitedInput = new LimitedInputStream(in, size);
+      final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput);
+      codedInput.setSizeLimit(size);
+      builder.mergeFrom(codedInput);
+      codedInput.checkLastTagWas(0);
+    }
+  }
+
+  /**
+   * This is cut and paste from protobuf's package private AbstractMessageLite.
+   *
+   * An InputStream implementations which reads from some other InputStream
+   * but is limited to a particular number of bytes.  Used by
+   * mergeDelimitedFrom().  This is intentionally package-private so that
+   * UnknownFieldSet can share it.
+   */
+  static final class LimitedInputStream extends FilterInputStream {
+    private int limit;
+
+    LimitedInputStream(InputStream in, int limit) {
+      super(in);
+      this.limit = limit;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return Math.min(super.available(), limit);
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (limit <= 0) {
+        return -1;
+      }
+      final int result = super.read();
+      if (result >= 0) {
+        --limit;
+      }
+      return result;
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, int len)
+            throws IOException {
+      if (limit <= 0) {
+        return -1;
+      }
+      len = Math.min(len, limit);
+      final int result = super.read(b, off, len);
+      if (result >= 0) {
+        limit -= result;
+      }
+      return result;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+      final long result = super.skip(Math.min(n, limit));
+      if (result >= 0) {
+        limit -= result;
+      }
+      return result;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 1112b12..27a0b06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -31,13 +31,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -70,6 +66,7 @@ public class TestMobStoreScanner {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
     TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024);
 
     TEST_UTIL.startMiniCluster(1);
   }
@@ -136,6 +133,26 @@ public class TestMobStoreScanner {
     testGetFromArchive(true);
   }
 
+  @Test(timeout=60000)
+  public void testGetMassive() throws Exception {
+    String TN = "testGetMassive";
+    setUp(defaultThreshold, TN);
+
+    // Put some data 5 10, 15, 20  mb ok  (this would be right below protobuf default max size of 64MB.
+    // 25, 30, 40 fail.  these is above protobuf max size of 64MB
+    byte[] bigValue = new byte[25*1024*1024];
+
+    Put put = new Put(row1);
+    put.add(family, qf1, bigValue);
+    put.add(family, qf2, bigValue);
+    put.add(family, qf3, bigValue);
+    table.put(put);
+
+    Get g = new Get(row1);
+    Result r = table.get(g);
+    // should not have blown up.
+  }
+
   public void testGetFromFiles(boolean reversed) throws Exception {
     String TN = "testGetFromFiles" + reversed;
     setUp(defaultThreshold, TN);