You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:08:25 UTC
svn commit: r1181438 - in
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase:
client/Result.java io/HbaseObjectWritable.java io/WritableWithSize.java
ipc/ByteBufferOutputStream.java ipc/HBaseServer.java
Author: nspiegelberg
Date: Tue Oct 11 02:08:24 2011
New Revision: 1181438
URL: http://svn.apache.org/viewvc?rev=1181438&view=rev
Log:
import HBASE-3199
Summary:
Importing patch for large responses.
With this patch, we avoid doubling the buffer/resizing approach, and determine
the exact size of the buffer.
I'll separately commit the code to LOG large responses.
Test Plan:
unit tests...
DiffCamp Revision: 179649
Reviewed By: hkuang
Commenters: aravind
CC: achao, hkuang, kannan, aravind, hbase@lists
Revert Plan:
OK
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java Tue Oct 11 02:08:24 2011
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -62,7 +63,7 @@ import java.util.TreeMap;
* through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
* {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.
*/
-public class Result implements Writable {
+public class Result implements Writable, WritableWithSize {
private static final byte RESULT_VERSION = (byte)1;
private KeyValue [] kvs = null;
@@ -427,6 +428,20 @@ public class Result implements Writable
this.kvs = kvs.toArray(new KeyValue[kvs.size()]);
}
+ public long getWritableSize() {
+ if (isEmpty())
+ return Bytes.SIZEOF_INT; // int size = 0
+
+ long size = Bytes.SIZEOF_INT; // totalLen
+
+ for (KeyValue kv : kvs) {
+ size += kv.getLength();
+ size += Bytes.SIZEOF_INT; // kv.getLength
+ }
+
+ return size;
+ }
+
public void write(final DataOutput out)
throws IOException {
if(isEmpty()) {
@@ -444,6 +459,29 @@ public class Result implements Writable
}
}
+ public static long getWriteArraySize(Result [] results) {
+ long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION
+ if (results == null || results.length == 0) {
+ size += Bytes.SIZEOF_INT;
+ return size;
+ }
+
+ size += Bytes.SIZEOF_INT; // results.length
+ size += Bytes.SIZEOF_INT; // bufLen
+ for (Result result : results) {
+ size += Bytes.SIZEOF_INT; // either 0 or result.size()
+ if (result == null || result.isEmpty())
+ continue;
+
+ for (KeyValue kv : result.raw()) {
+ size += Bytes.SIZEOF_INT; // kv.getLength();
+ size += kv.getLength();
+ }
+ }
+
+ return size;
+ }
+
public static void writeArray(final DataOutput out, Result [] results)
throws IOException {
// Write version when writing array form.
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Oct 11 02:08:24 2011
@@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.util.Byte
* name and reflection to instantiate class was costing in excess of the cell
* handling).
*/
-public class HbaseObjectWritable implements Writable, Configurable {
+public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable {
protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
// Here we maintain two static maps of classes to code and vice versa.
@@ -231,6 +231,10 @@ public class HbaseObjectWritable impleme
writeObject(out, instance, declaredClass, conf);
}
+ public long getWritableSize() {
+ return getWritableSize(instance, declaredClass, conf);
+ }
+
private static class NullInstance extends Configured implements Writable {
Class<?> declaredClass;
/** default constructor for writable */
@@ -282,6 +286,27 @@ public class HbaseObjectWritable impleme
out.writeByte(code);
}
+
+ public static long getWritableSize(Object instance, Class declaredClass,
+ Configuration conf) {
+ long size = Bytes.SIZEOF_BYTE; // code
+ if (instance == null) {
+ return 0L;
+ }
+
+ if (declaredClass.isArray()) {
+ if (declaredClass.equals(Result[].class)) {
+
+ return size + Result.getWriteArraySize((Result[])instance);
+ }
+ }
+ if (declaredClass.equals(Result.class)) {
+ Result r = (Result) instance;
+ // one extra class code for writable instance.
+ return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
+ }
+ return 0L; // no hint is the default.
+ }
/**
* Write a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding.
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java?rev=1181438&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java Tue Oct 11 02:08:24 2011
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+/**
+ * An optional interface to 'size' writables.
+ */
+public interface WritableWithSize {
+ /**
+ * Provide a size hint to the caller. write() should ideally
+ * not go beyond this if at all possible.
+ *
+ * You can return 0 if there is no size hint.
+ *
+ * @return the size of the writable
+ */
+ public long getWritableSize();
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java?rev=1181438&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java Tue Oct 11 02:08:24 2011
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Not thread safe!
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+ protected ByteBuffer buf;
+
+ public ByteBufferOutputStream(int capacity) {
+ this(capacity, false);
+ }
+
+ public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
+ if (useDirectByteBuffer) {
+ buf = ByteBuffer.allocateDirect(capacity);
+ } else {
+ buf = ByteBuffer.allocate(capacity);
+ }
+ }
+
+ public int size() {
+ return buf.position();
+ }
+
+ /**
+ * This flips the underlying BB so be sure to use it _last_!
+ * @return
+ */
+ public ByteBuffer getByteBuffer() {
+ buf.flip();
+ return buf;
+ }
+
+ private void checkSizeAndGrow(int extra) {
+ if ( (buf.position() + extra) > buf.limit()) {
+ int newSize = (int)Math.min((((long)buf.capacity()) * 2),
+ (long)(Integer.MAX_VALUE));
+ newSize = Math.max(newSize, buf.position() + extra);
+ ByteBuffer newBuf = ByteBuffer.allocate(newSize);
+ buf.flip();
+ newBuf.put(buf);
+ buf = newBuf;
+ }
+ }
+
+ // OutputStream
+ @Override
+ public void write(int b) throws IOException {
+ checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+
+ buf.put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ checkSizeAndGrow(b.length);
+
+ buf.put(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkSizeAndGrow(len);
+
+ buf.put(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // noop
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop again. heh
+ }
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Oct 11 02:08:24 2011
@@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -85,6 +87,14 @@ public abstract class HBaseServer {
*/
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+ private static final String WARN_RESPONSE_SIZE =
+ "hbase.ipc.warn.response.size";
+
+ /** Default value for above param */
+ private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+ private final int warnResponseSize;
+
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@@ -889,6 +899,7 @@ public abstract class HBaseServer {
/** Handles queued calls . */
private class Handler extends Thread {
+ static final int BUFFER_INITIAL_SIZE = 1024;
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
@@ -898,8 +909,6 @@ public abstract class HBaseServer {
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(HBaseServer.this);
- final int buffersize = 16 * 1024;
- ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
while (running) {
try {
Call call = callQueue.take(); // pop the queue; maybe blocked here
@@ -925,14 +934,25 @@ public abstract class HBaseServer {
UserGroupInformation.setCurrentUser(previous);
CurCall.set(null);
- if (buf.size() > buffersize) {
- // Allocate a new BAOS as reset only moves size back to zero but
- // keeps the buffer of whatever the largest write was -- see
- // hbase-900.
- buf = new ByteArrayOutputStream(buffersize);
- } else {
- buf.reset();
+ int size = BUFFER_INITIAL_SIZE;
+ if (value instanceof WritableWithSize) {
+ // get the size hint.
+ WritableWithSize ohint = (WritableWithSize)value;
+ long hint = ohint.getWritableSize();
+ if (hint > 0) {
+ hint = hint + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+ if (hint > Integer.MAX_VALUE) {
+ // oops, new problem.
+ IOException ioe =
+ new IOException("Result buffer size too large: " + hint);
+ errorClass = ioe.getClass().getName();
+ error = StringUtils.stringifyException(ioe);
+ } else {
+ size = ((int)hint);
+ }
+ }
}
+ ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
out.writeInt(call.id); // write call id
out.writeBoolean(error != null); // write error flag
@@ -943,7 +963,14 @@ public abstract class HBaseServer {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
- call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+
+ if (buf.size() > warnResponseSize) {
+ LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+ + StringUtils.humanReadableInt(buf.size()));
+ }
+
+
+ call.setResponse(buf.getByteBuffer());
responder.doRespond(call);
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
@@ -1006,6 +1033,10 @@ public abstract class HBaseServer {
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
+ this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+ DEFAULT_WARN_RESPONSE_SIZE);
+
+
// Create the responder here
responder = new Responder();
}