You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:08:25 UTC

svn commit: r1181438 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: client/Result.java io/HbaseObjectWritable.java io/WritableWithSize.java ipc/ByteBufferOutputStream.java ipc/HBaseServer.java

Author: nspiegelberg
Date: Tue Oct 11 02:08:24 2011
New Revision: 1181438

URL: http://svn.apache.org/viewvc?rev=1181438&view=rev
Log:
import HBASE-3199

Summary:
Importing patch for large responses.

With this patch, we avoid doubling the buffer/resizing approach, and determine
the exact size of the buffer.

I'll separately commit the code to LOG large responses.

Test Plan:
unit tests...

DiffCamp Revision: 179649
Reviewed By: hkuang
Commenters: aravind
CC: achao, hkuang, kannan, aravind, hbase@lists
Revert Plan:
OK

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Result.java Tue Oct 11 02:08:24 2011
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
@@ -62,7 +63,7 @@ import java.util.TreeMap;
  * through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
  * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.
  */
-public class Result implements Writable {
+public class Result implements Writable, WritableWithSize {
   private static final byte RESULT_VERSION = (byte)1;
 
   private KeyValue [] kvs = null;
@@ -427,6 +428,20 @@ public class Result implements Writable 
     this.kvs = kvs.toArray(new KeyValue[kvs.size()]);
   }
 
+  public long getWritableSize() {
+    if (isEmpty())
+      return Bytes.SIZEOF_INT; // int size = 0
+
+    long size = Bytes.SIZEOF_INT; // totalLen
+
+    for (KeyValue kv : kvs) {
+      size += kv.getLength();
+      size += Bytes.SIZEOF_INT; // kv.getLength
+    }
+
+    return size;
+  }
+
   public void write(final DataOutput out)
   throws IOException {
     if(isEmpty()) {
@@ -444,6 +459,29 @@ public class Result implements Writable 
     }
   }
 
+  public static long getWriteArraySize(Result [] results) {
+    long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION
+    if (results == null || results.length == 0) {
+      size += Bytes.SIZEOF_INT;
+      return size;
+    }
+
+    size += Bytes.SIZEOF_INT; // results.length
+    size += Bytes.SIZEOF_INT; // bufLen
+    for (Result result : results) {
+      size += Bytes.SIZEOF_INT; // either 0 or result.size()
+      if (result == null || result.isEmpty())
+        continue;
+
+      for (KeyValue kv : result.raw()) {
+        size += Bytes.SIZEOF_INT; // kv.getLength();
+        size += kv.getLength();
+      }
+    }
+
+    return size;
+  }
+
   public static void writeArray(final DataOutput out, Result [] results)
   throws IOException {
     // Write version when writing array form.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Oct 11 02:08:24 2011
@@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.util.Byte
  * name and reflection to instantiate class was costing in excess of the cell
  * handling).
  */
-public class HbaseObjectWritable implements Writable, Configurable {
+public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable {
   protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
 
   // Here we maintain two static maps of classes to code and vice versa.
@@ -231,6 +231,10 @@ public class HbaseObjectWritable impleme
     writeObject(out, instance, declaredClass, conf);
   }
 
+  public long getWritableSize() {
+    return getWritableSize(instance, declaredClass, conf);
+  }
+
   private static class NullInstance extends Configured implements Writable {
     Class<?> declaredClass;
     /** default constructor for writable */
@@ -282,6 +286,27 @@ public class HbaseObjectWritable impleme
     out.writeByte(code);
   }
 
+
+  public static long getWritableSize(Object instance, Class declaredClass,
+                                     Configuration conf) {
+    long size = Bytes.SIZEOF_BYTE; // code
+    if (instance == null) {
+      return 0L;
+    }
+
+    if (declaredClass.isArray()) {
+      if (declaredClass.equals(Result[].class)) {
+
+        return size + Result.getWriteArraySize((Result[])instance);
+      }
+    }
+    if (declaredClass.equals(Result.class)) {
+      Result r = (Result) instance;
+      // one extra class code for writable instance.
+      return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
+    }
+    return 0L; // no hint is the default.
+  }
   /**
    * Write a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding.

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java?rev=1181438&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java Tue Oct 11 02:08:24 2011
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+/**
+ * An optional interface to 'size' writables.
+ */
+public interface WritableWithSize {
+  /**
+   * Provide a size hint to the caller. write() should ideally
+   * not go beyond this if at all possible.
+   *
+   * You can return 0 if there is no size hint.
+   *
+   * @return the size of the writable
+   */
+  public long getWritableSize();
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java?rev=1181438&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java Tue Oct 11 02:08:24 2011
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Not thread safe!
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+  protected ByteBuffer buf;
+
+  public ByteBufferOutputStream(int capacity) {
+    this(capacity, false);
+  }
+
+  public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
+    if (useDirectByteBuffer) {
+      buf = ByteBuffer.allocateDirect(capacity);
+    } else {
+      buf = ByteBuffer.allocate(capacity);
+    }
+  }
+
+  public int size() {
+    return buf.position();
+  }
+
+  /**
+   * This flips the underlying BB so be sure to use it _last_!
+   * @return
+   */
+  public ByteBuffer getByteBuffer() {
+    buf.flip();
+    return buf;
+  }
+
+  private void checkSizeAndGrow(int extra) {
+    if ( (buf.position() + extra) > buf.limit()) {
+      int newSize = (int)Math.min((((long)buf.capacity()) * 2),
+                                  (long)(Integer.MAX_VALUE));
+      newSize = Math.max(newSize, buf.position() + extra);
+      ByteBuffer newBuf = ByteBuffer.allocate(newSize);
+      buf.flip();
+      newBuf.put(buf);
+      buf = newBuf;
+    }
+  }
+
+  // OutputStream
+  @Override
+  public void write(int b) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+
+    buf.put((byte)b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    checkSizeAndGrow(b.length);
+
+    buf.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+
+    buf.put(b, off, len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // noop
+  }
+
+  @Override
+  public void close() throws IOException {
+    // noop again. heh
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1181438&r1=1181437&r2=1181438&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Oct 11 02:08:24 2011
@@ -23,6 +23,8 @@ package org.apache.hadoop.hbase.ipc;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -85,6 +87,14 @@ public abstract class HBaseServer {
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
 
+  private static final String WARN_RESPONSE_SIZE =
+      "hbase.ipc.warn.response.size";
+
+  /** Default value for above param */
+  private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+  private final int warnResponseSize;
+
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
 
@@ -889,6 +899,7 @@ public abstract class HBaseServer {
 
   /** Handles queued calls . */
   private class Handler extends Thread {
+    static final int BUFFER_INITIAL_SIZE = 1024;
     public Handler(int instanceNumber) {
       this.setDaemon(true);
       this.setName("IPC Server handler "+ instanceNumber + " on " + port);
@@ -898,8 +909,6 @@ public abstract class HBaseServer {
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(HBaseServer.this);
-      final int buffersize = 16 * 1024;
-      ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
       while (running) {
         try {
           Call call = callQueue.take(); // pop the queue; maybe blocked here
@@ -925,14 +934,25 @@ public abstract class HBaseServer {
           UserGroupInformation.setCurrentUser(previous);
           CurCall.set(null);
 
-          if (buf.size() > buffersize) {
-            // Allocate a new BAOS as reset only moves size back to zero but
-            // keeps the buffer of whatever the largest write was -- see
-            // hbase-900.
-            buf = new ByteArrayOutputStream(buffersize);
-          } else {
-            buf.reset();
+          int size = BUFFER_INITIAL_SIZE;
+          if (value instanceof WritableWithSize) {
+            // get the size hint.
+            WritableWithSize ohint = (WritableWithSize)value;
+            long hint = ohint.getWritableSize();
+            if (hint > 0) {
+              hint = hint + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+              if (hint > Integer.MAX_VALUE) {
+                // oops, new problem.
+                IOException ioe =
+                    new IOException("Result buffer size too large: " + hint);
+                errorClass = ioe.getClass().getName();
+                error = StringUtils.stringifyException(ioe);
+              } else {
+                size = ((int)hint);
+              }
+            }
           }
+          ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
           DataOutputStream out = new DataOutputStream(buf);
           out.writeInt(call.id);                // write call id
           out.writeBoolean(error != null);      // write error flag
@@ -943,7 +963,14 @@ public abstract class HBaseServer {
             WritableUtils.writeString(out, errorClass);
             WritableUtils.writeString(out, error);
           }
-          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+
+          if (buf.size() > warnResponseSize) {
+            LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+                     + StringUtils.humanReadableInt(buf.size()));
+          }
+
+
+          call.setResponse(buf.getByteBuffer());
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -1006,6 +1033,10 @@ public abstract class HBaseServer {
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
 
+    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+                                        DEFAULT_WARN_RESPONSE_SIZE);
+
+
     // Create the responder here
     responder = new Responder();
   }