You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2011/03/04 01:48:20 UTC
svn commit: r1076907 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/io/hfile/
src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/util/
Author: rawson
Date: Fri Mar 4 00:48:19 2011
New Revision: 1076907
URL: http://svn.apache.org/viewvc?rev=1076907&view=rev
Log:
HBASE-3514 revert
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferOutputStream.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1076907&r1=1076906&r2=1076907&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Mar 4 00:48:19 2011
@@ -55,7 +55,6 @@ Release 0.91.0 - Unreleased
using a different classloader than system default
HBASE-3578 TableInputFormat does not setup the configuration for HBase
mapreduce jobs correctly (Dan Harvey via Stack)
- HBASE-3514 Speedup HFile.Writer append (Matteo via Ryan)
HBASE-3593 DemoClient.cpp is outdated
HBASE-3601 TestMasterFailover broken in TRUNK
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1076907&r1=1076906&r2=1076907&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Fri Mar 4 00:48:19 2011
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
-import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
@@ -222,6 +221,9 @@ public class HFile {
// Used to ensure we write in order.
private final RawComparator<byte []> comparator;
+ // A stream made per block written.
+ private DataOutputStream out;
+
// Number of uncompressed bytes per block. Reinitialized when we start
// new block.
private int blocksize;
@@ -262,9 +264,9 @@ public class HFile {
// Block cache to optionally fill on write
private BlockCache blockCache;
- // Byte array output stream made per block written.
- private ByteBufferOutputStream bbos = null;
- private DataOutputStream bbosDos = null;
+ // Additional byte array output stream used to fill block cache
+ private ByteArrayOutputStream baos;
+ private DataOutputStream baosDos;
private int blockNumber = 0;
/**
@@ -358,7 +360,7 @@ public class HFile {
* @throws IOException
*/
private void checkBlockBoundary() throws IOException {
- if (bbosDos != null && bbosDos.size() < blocksize) return;
+ if (this.out != null && this.out.size() < blocksize) return;
finishBlock();
newBlock();
}
@@ -368,18 +370,11 @@ public class HFile {
* @throws IOException
*/
private void finishBlock() throws IOException {
- if (bbosDos == null) return;
-
- // Flush Data Output Stream
- bbosDos.flush();
-
- // Compress Data and write to output stream
- DataOutputStream compressStream = getCompressingStream();
- bbos.writeTo(compressStream);
- int size = releaseCompressingStream(compressStream);
-
+ if (this.out == null) return;
long now = System.currentTimeMillis();
+ int size = releaseCompressingStream(this.out);
+ this.out = null;
blockKeys.add(firstKey);
blockOffsets.add(Long.valueOf(blockBegin));
blockDataSizes.add(Integer.valueOf(size));
@@ -389,15 +384,14 @@ public class HFile {
writeOps++;
if (blockCache != null) {
- ByteBuffer blockToCache = bbos.getByteBuffer();
+ baosDos.flush();
+ byte [] bytes = baos.toByteArray();
+ ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
+ bytes.length - DATABLOCKMAGIC.length);
String blockName = path.toString() + blockNumber;
blockCache.cacheBlock(blockName, blockToCache);
+ baosDos.close();
}
-
- bbosDos.close();
- bbosDos = null;
- bbos = null;
-
blockNumber++;
}
@@ -408,16 +402,14 @@ public class HFile {
private void newBlock() throws IOException {
// This is where the next block begins.
blockBegin = outputStream.getPos();
-
+ this.out = getCompressingStream();
+ this.out.write(DATABLOCKMAGIC);
firstKey = null;
-
- // to avoid too many calls to realloc(),
- // pre-allocates the byte stream to the block size + 25%
- // only if blocksize is under 1Gb
- int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4));
- bbos = new ByteBufferOutputStream(bbosBlocksize);
- bbosDos = new DataOutputStream(bbos);
- bbosDos.write(DATABLOCKMAGIC);
+ if (blockCache != null) {
+ this.baos = new ByteArrayOutputStream();
+ this.baosDos = new DataOutputStream(baos);
+ this.baosDos.write(DATABLOCKMAGIC);
+ }
}
/*
@@ -475,7 +467,7 @@ public class HFile {
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
- if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
+ if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
key, 0, key.length) > 0) {
break;
}
@@ -571,12 +563,12 @@ public class HFile {
checkBlockBoundary();
}
// Write length of key and value and then actual key and value bytes.
- this.bbosDos.writeInt(klength);
+ this.out.writeInt(klength);
this.keylength += klength;
- this.bbosDos.writeInt(vlength);
+ this.out.writeInt(vlength);
this.valuelength += vlength;
- this.bbosDos.write(key, koffset, klength);
- this.bbosDos.write(value, voffset, vlength);
+ this.out.write(key, koffset, klength);
+ this.out.write(value, voffset, vlength);
// Are we the first key in this block?
if (this.firstKey == null) {
// Copy the key.
@@ -587,6 +579,13 @@ public class HFile {
this.lastKeyOffset = koffset;
this.lastKeyLength = klength;
this.entryCount ++;
+ // If we are pre-caching blocks on write, fill byte array stream
+ if (blockCache != null) {
+ this.baosDos.writeInt(klength);
+ this.baosDos.writeInt(vlength);
+ this.baosDos.write(key, koffset, klength);
+ this.baosDos.write(value, voffset, vlength);
+ }
}
/*
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java?rev=1076907&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java Fri Mar 4 00:48:19 2011
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Not thread safe!
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+ protected ByteBuffer buf;
+
+ public ByteBufferOutputStream(int capacity) {
+ this(capacity, false);
+ }
+
+ public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
+ if (useDirectByteBuffer) {
+ buf = ByteBuffer.allocateDirect(capacity);
+ } else {
+ buf = ByteBuffer.allocate(capacity);
+ }
+ }
+
+ public int size() {
+ return buf.position();
+ }
+
+ /**
+ * This flips the underlying BB so be sure to use it _last_!
+ * @return ByteBuffer
+ */
+ public ByteBuffer getByteBuffer() {
+ buf.flip();
+ return buf;
+ }
+
+ private void checkSizeAndGrow(int extra) {
+ if ( (buf.position() + extra) > buf.limit()) {
+ // size calculation is complex, because we could overflow negative,
+ // and/or not allocate enough space. this fixes that.
+ int newSize = (int)Math.min((((long)buf.capacity()) * 2),
+ (long)(Integer.MAX_VALUE));
+ newSize = Math.max(newSize, buf.position() + extra);
+
+ ByteBuffer newBuf = ByteBuffer.allocate(newSize);
+ buf.flip();
+ newBuf.put(buf);
+ buf = newBuf;
+ }
+ }
+
+ // OutputStream
+ @Override
+ public void write(int b) throws IOException {
+ checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+
+ buf.put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ checkSizeAndGrow(b.length);
+
+ buf.put(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkSizeAndGrow(len);
+
+ buf.put(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // noop
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop again. heh
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1076907&r1=1076906&r2=1076907&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Mar 4 00:48:19 2011
@@ -60,7 +60,6 @@ import org.apache.hadoop.ipc.VersionedPr
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;