You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2011/03/15 23:01:59 UTC

svn commit: r1081959 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/util/

Author: rawson
Date: Tue Mar 15 22:01:59 2011
New Revision: 1081959

URL: http://svn.apache.org/viewvc?rev=1081959&view=rev
Log:
HBASE-3514  Speedup HFile.Writer append (Matteo Bertozzi via Ryan)

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java
Removed:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1081959&r1=1081958&r2=1081959&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Mar 15 22:01:59 2011
@@ -68,6 +68,7 @@ Release 0.91.0 - Unreleased
    HBASE-3538  Column families allow to have slashes in name (Ian Knome via Stack)
    HBASE-3313  Table name isn't checked in isTableEnabled/isTableDisabled
                (Ted Yu via Stack)
+   HBASE-3514  Speedup HFile.Writer append (Matteo Bertozzi via Ryan)
 
   IMPROVEMENTS
    HBASE-3290  Max Compaction Size (Nicolas Spiegelberg via Stack)  

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1081959&r1=1081958&r2=1081959&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Mar 15 22:01:59 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompressionTest;
@@ -221,9 +222,6 @@ public class HFile {
     // Used to ensure we write in order.
     private final RawComparator<byte []> comparator;
 
-    // A stream made per block written.
-    private DataOutputStream out;
-
     // Number of uncompressed bytes per block.  Reinitialized when we start
     // new block.
     private int blocksize;
@@ -264,9 +262,9 @@ public class HFile {
     // Block cache to optionally fill on write
     private BlockCache blockCache;
 
-    // Additional byte array output stream used to fill block cache
-    private ByteArrayOutputStream baos;
-    private DataOutputStream baosDos;
+    // Byte buffer output stream made per block written.
+    private ByteBufferOutputStream bbos = null;
+    private DataOutputStream bbosDos = null;
     private int blockNumber = 0;
 
     /**
@@ -360,7 +358,7 @@ public class HFile {
      * @throws IOException
      */
     private void checkBlockBoundary() throws IOException {
-      if (this.out != null && this.out.size() < blocksize) return;
+      if (bbosDos != null && bbosDos.size() < blocksize) return;
       finishBlock();
       newBlock();
     }
@@ -370,11 +368,18 @@ public class HFile {
      * @throws IOException
      */
     private void finishBlock() throws IOException {
-      if (this.out == null) return;
+      if (bbosDos == null) return;
+
+      // Flush Data Output Stream
+      bbosDos.flush();
+
+      // Compress Data and write to output stream
+      DataOutputStream compressStream = getCompressingStream();
+      bbos.writeTo(compressStream);
+      int size = releaseCompressingStream(compressStream);
+
       long now = System.currentTimeMillis();
 
-      int size = releaseCompressingStream(this.out);
-      this.out = null;
       blockKeys.add(firstKey);
       blockOffsets.add(Long.valueOf(blockBegin));
       blockDataSizes.add(Integer.valueOf(size));
@@ -384,14 +389,16 @@ public class HFile {
       writeOps++;
 
       if (blockCache != null) {
-        baosDos.flush();
-        byte [] bytes = baos.toByteArray();
-        ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
-            bytes.length - DATABLOCKMAGIC.length);
+        byte[] bytes = bbos.toByteArray(DATABLOCKMAGIC.length, bbos.size() - DATABLOCKMAGIC.length);
+        ByteBuffer blockToCache = ByteBuffer.wrap(bytes);
         String blockName = path.toString() + blockNumber;
         blockCache.cacheBlock(blockName, blockToCache);
-        baosDos.close();
       }
+
+      bbosDos.close();
+      bbosDos = null;
+      bbos = null;
+
       blockNumber++;
     }
 
@@ -402,14 +409,16 @@ public class HFile {
     private void newBlock() throws IOException {
       // This is where the next block begins.
       blockBegin = outputStream.getPos();
-      this.out = getCompressingStream();
-      this.out.write(DATABLOCKMAGIC);
+
       firstKey = null;
-      if (blockCache != null) {
-        this.baos = new ByteArrayOutputStream();
-        this.baosDos = new DataOutputStream(baos);
-        this.baosDos.write(DATABLOCKMAGIC);
-      }
+
+      // to avoid too many calls to realloc(),
+      // pre-allocates the byte stream to the block size + 25%
+      // only if blocksize is under 1Gb
+      int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4));
+      bbos = new ByteBufferOutputStream(bbosBlocksize);
+      bbosDos = new DataOutputStream(bbos);
+      bbosDos.write(DATABLOCKMAGIC);
     }
 
     /*
@@ -467,7 +476,7 @@ public class HFile {
       for (i = 0; i < metaNames.size(); ++i) {
         // stop when the current key is greater than our own
         byte[] cur = metaNames.get(i);
-        if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, 
+        if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
             key, 0, key.length) > 0) {
           break;
         }
@@ -563,12 +572,12 @@ public class HFile {
         checkBlockBoundary();
       }
       // Write length of key and value and then actual key and value bytes.
-      this.out.writeInt(klength);
+      this.bbosDos.writeInt(klength);
       this.keylength += klength;
-      this.out.writeInt(vlength);
+      this.bbosDos.writeInt(vlength);
       this.valuelength += vlength;
-      this.out.write(key, koffset, klength);
-      this.out.write(value, voffset, vlength);
+      this.bbosDos.write(key, koffset, klength);
+      this.bbosDos.write(value, voffset, vlength);
       // Are we the first key in this block?
       if (this.firstKey == null) {
         // Copy the key.
@@ -579,13 +588,6 @@ public class HFile {
       this.lastKeyOffset = koffset;
       this.lastKeyLength = klength;
       this.entryCount ++;
-      // If we are pre-caching blocks on write, fill byte array stream
-      if (blockCache != null) {
-        this.baosDos.writeInt(klength);
-        this.baosDos.writeInt(vlength);
-        this.baosDos.write(key, koffset, klength);
-        this.baosDos.write(value, voffset, vlength);
-      }
     }
 
     /*

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1081959&r1=1081958&r2=1081959&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Mar 15 22:01:59 2011
@@ -60,6 +60,7 @@ import org.apache.hadoop.ipc.VersionedPr
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java?rev=1081959&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java Tue Mar 15 22:01:59 2011
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Not thread safe!
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+  protected ByteBuffer buf;
+
+  public ByteBufferOutputStream(int capacity) {
+    this(capacity, false);
+  }
+
+  public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
+    if (useDirectByteBuffer) {
+      buf = ByteBuffer.allocateDirect(capacity);
+    } else {
+      buf = ByteBuffer.allocate(capacity);
+    }
+  }
+
+  public int size() {
+    return buf.position();
+  }
+
+  /**
+   * This flips the underlying BB so be sure to use it _last_!
+   * @return ByteBuffer
+   */
+  public ByteBuffer getByteBuffer() {
+    buf.flip();
+    return buf;
+  }
+
+  private void checkSizeAndGrow(int extra) {
+    if ( (buf.position() + extra) > buf.limit()) {
+      // size calculation is complex, because we could overflow negative,
+      // and/or not allocate enough space. this fixes that.
+      int newSize = (int)Math.min((((long)buf.capacity()) * 2),
+          (long)(Integer.MAX_VALUE));
+      newSize = Math.max(newSize, buf.position() + extra);
+
+      ByteBuffer newBuf = ByteBuffer.allocate(newSize);
+      buf.flip();
+      newBuf.put(buf);
+      buf = newBuf;
+    }
+  }
+
+  // OutputStream
+  @Override
+  public void write(int b) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+
+    buf.put((byte)b);
+  }
+
+ /**
+  * Writes the complete contents of this byte buffer output stream to
+  * the specified output stream argument.
+  *
+  * @param      out   the output stream to which to write the data.
+  * @exception  IOException  if an I/O error occurs.
+  */
+  public synchronized void writeTo(OutputStream out) throws IOException {
+    WritableByteChannel channel = Channels.newChannel(out);
+    channel.write(getByteBuffer());
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    checkSizeAndGrow(b.length);
+
+    buf.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+
+    buf.put(b, off, len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // noop
+  }
+
+  @Override
+  public void close() throws IOException {
+    // noop again. heh
+  }
+
+  public byte[] toByteArray(int offset, int length) {
+    int position = buf.position();
+    byte[] chunk;
+
+    try {
+      buf.position(offset);
+
+      chunk = new byte[length];
+      buf.get(chunk, 0, length);
+    } finally {
+      buf.position(position);
+    }
+
+    return chunk;
+  }
+}