You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/07/22 21:52:21 UTC
[03/50] [abbrv] hbase git commit: HBASE-13230 [mob] reads hang when
trying to read rows with large mobs (>10MB)
HBASE-13230 [mob] reads hang when trying to read rows with large mobs (>10MB)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aedd0ebe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aedd0ebe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aedd0ebe
Branch: refs/heads/master
Commit: aedd0ebe9b8d1968f7d91772b369829e00606087
Parents: 8c1edeb
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Mar 13 10:37:52 2015 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Mar 13 10:45:46 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/AsyncServerResponseHandler.java | 3 +-
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 3 +-
.../hadoop/hbase/protobuf/ProtobufUtil.java | 93 ++++++++++++++++++--
.../hbase/regionserver/TestMobStoreScanner.java | 29 ++++--
4 files changed, 112 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index d71bf5e..43028c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.ipc.RemoteException;
@@ -92,7 +93,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
// Call may be null because it may have timedout and been cleaned up on this side already
if (call.responseDefaultType != null) {
Message.Builder builder = call.responseDefaultType.newBuilderForType();
- builder.mergeDelimitedFrom(in);
+ ProtobufUtil.mergeDelimitedFrom(builder, in);
value = builder.build();
}
CellScanner cellBlockScanner = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 9400a2c..dad5164 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -950,7 +951,7 @@ public class RpcClientImpl extends AbstractRpcClient {
Message value = null;
if (call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType();
- builder.mergeDelimitedFrom(in);
+ ProtobufUtil.mergeDelimitedFrom(builder, in);
value = builder.build();
}
CellScanner cellBlockScanner = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index caae1bb..7bb9de1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.protobuf;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -37,6 +39,7 @@ import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;
+import com.google.protobuf.*;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -149,14 +152,6 @@ import org.apache.hadoop.security.token.Token;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
/**
* Protobufs utility.
@@ -2994,4 +2989,86 @@ public final class ProtobufUtil {
return desc.build();
}
+
+
+
+ /**
+ * This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding
+ * buffers
+ * @param builder current message builder
+ * @param in Inputsream with delimited protobuf data
+ * @throws IOException
+ */
+ public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException {
+ // This used to be builder.mergeDelimitedFrom(in);
+ // but is replaced to allow us to bump the protobuf size limit.
+ final int firstByte = in.read();
+ if (firstByte == -1) {
+ // bail out. (was return false;)
+ } else {
+ final int size = CodedInputStream.readRawVarint32(firstByte, in);
+ final InputStream limitedInput = new LimitedInputStream(in, size);
+ final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput);
+ codedInput.setSizeLimit(size);
+ builder.mergeFrom(codedInput);
+ codedInput.checkLastTagWas(0);
+ }
+ }
+
+ /**
+ * This is cut and paste from protobuf's package private AbstractMessageLite.
+ *
+ * An InputStream implementations which reads from some other InputStream
+ * but is limited to a particular number of bytes. Used by
+ * mergeDelimitedFrom(). This is intentionally package-private so that
+ * UnknownFieldSet can share it.
+ */
+ static final class LimitedInputStream extends FilterInputStream {
+ private int limit;
+
+ LimitedInputStream(InputStream in, int limit) {
+ super(in);
+ this.limit = limit;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return Math.min(super.available(), limit);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (limit <= 0) {
+ return -1;
+ }
+ final int result = super.read();
+ if (result >= 0) {
+ --limit;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, int len)
+ throws IOException {
+ if (limit <= 0) {
+ return -1;
+ }
+ len = Math.min(len, limit);
+ final int result = super.read(b, off, len);
+ if (result >= 0) {
+ limit -= result;
+ }
+ return result;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ final long result = super.skip(Math.min(n, limit));
+ if (result >= 0) {
+ limit -= result;
+ }
+ return result;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aedd0ebe/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 1112b12..27a0b06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -31,13 +31,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -70,6 +66,7 @@ public class TestMobStoreScanner {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024);
TEST_UTIL.startMiniCluster(1);
}
@@ -136,6 +133,26 @@ public class TestMobStoreScanner {
testGetFromArchive(true);
}
+ @Test(timeout=60000)
+ public void testGetMassive() throws Exception {
+ String TN = "testGetMassive";
+ setUp(defaultThreshold, TN);
+
+ // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf default max size of 64MB.
+ // 25, 30, 40 fail. these is above protobuf max size of 64MB
+ byte[] bigValue = new byte[25*1024*1024];
+
+ Put put = new Put(row1);
+ put.add(family, qf1, bigValue);
+ put.add(family, qf2, bigValue);
+ put.add(family, qf3, bigValue);
+ table.put(put);
+
+ Get g = new Get(row1);
+ Result r = table.get(g);
+ // should not have blown up.
+ }
+
public void testGetFromFiles(boolean reversed) throws Exception {
String TN = "testGetFromFiles" + reversed;
setUp(defaultThreshold, TN);