You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2015/11/07 01:57:07 UTC
[8/8] incubator-apex-malhar git commit: MLHR-1877 #resolve #comment
moved DTFile implementation to from contrib to lib
MLHR-1877 #resolve #comment moved DTFile implementation to from contrib to lib
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/02f48e1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/02f48e1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/02f48e1b
Branch: refs/heads/devel-3
Commit: 02f48e1b295920cb3c26b84eb802cfe18e3f9ea4
Parents: c1ebde9
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Nov 6 16:40:12 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Nov 6 16:41:11 2015 -0800
----------------------------------------------------------------------
.../hadoop/io/file/tfile/CacheManager.java | 183 --
.../apache/hadoop/io/file/tfile/DTBCFile.java | 1044 --------
.../org/apache/hadoop/io/file/tfile/DTFile.java | 2399 ------------------
.../tfile/ReusableByteArrayInputStream.java | 66 -
.../apache/hadoop/io/file/tfile/DTFileTest.java | 217 --
.../apache/hadoop/io/file/tfile/TestDTFile.java | 432 ----
.../io/file/tfile/TestDTFileByteArrays.java | 773 ------
.../io/file/tfile/TestTFileComparator2.java | 108 -
.../io/file/tfile/TestTFileComparators.java | 123 -
.../TestTFileJClassComparatorByteArrays.java | 59 -
.../tfile/TestTFileLzoCodecsByteArrays.java | 41 -
.../file/tfile/TestTFileLzoCodecsStreams.java | 39 -
.../tfile/TestTFileNoneCodecsByteArrays.java | 32 -
...ileNoneCodecsJClassComparatorByteArrays.java | 40 -
.../file/tfile/TestTFileNoneCodecsStreams.java | 32 -
.../hadoop/io/file/tfile/TestTFileSeek.java | 505 ----
.../file/tfile/TestTFileSeqFileComparison.java | 802 ------
.../hadoop/io/file/tfile/TestTFileSplit.java | 194 --
.../hadoop/io/file/tfile/TestTFileStreams.java | 423 ---
.../file/tfile/TestTFileUnsortedByteArrays.java | 239 --
.../hadoop/io/file/tfile/CacheManager.java | 185 ++
.../apache/hadoop/io/file/tfile/DTBCFile.java | 1044 ++++++++
.../org/apache/hadoop/io/file/tfile/DTFile.java | 2399 ++++++++++++++++++
.../tfile/ReusableByteArrayInputStream.java | 66 +
.../apache/hadoop/io/file/tfile/DTFileTest.java | 220 ++
.../apache/hadoop/io/file/tfile/TestDTFile.java | 432 ++++
.../io/file/tfile/TestDTFileByteArrays.java | 773 ++++++
.../io/file/tfile/TestTFileComparator2.java | 108 +
.../io/file/tfile/TestTFileComparators.java | 123 +
.../TestTFileJClassComparatorByteArrays.java | 59 +
.../tfile/TestTFileLzoCodecsByteArrays.java | 41 +
.../file/tfile/TestTFileLzoCodecsStreams.java | 39 +
.../tfile/TestTFileNoneCodecsByteArrays.java | 32 +
...ileNoneCodecsJClassComparatorByteArrays.java | 40 +
.../file/tfile/TestTFileNoneCodecsStreams.java | 32 +
.../hadoop/io/file/tfile/TestTFileSeek.java | 505 ++++
.../file/tfile/TestTFileSeqFileComparison.java | 802 ++++++
.../hadoop/io/file/tfile/TestTFileSplit.java | 194 ++
.../hadoop/io/file/tfile/TestTFileStreams.java | 423 +++
.../file/tfile/TestTFileUnsortedByteArrays.java | 239 ++
pom.xml | 1 +
41 files changed, 7757 insertions(+), 7751 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
deleted file mode 100644
index 2c82d09..0000000
--- a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.lang.management.ManagementFactory;
-import java.util.Collection;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.Weigher;
-
-/**
- * A single global managed cache
- * User can limit the cache size by num of entries, memory size (bytes) or percentage of total heap size
- * <br>
- * <br>
- * Please refer to <a href="https://code.google.com/p/guava-libraries/wiki/CachesExplained">Guava Cache</a> fir details
- * <br>
- * <br>
- * It keeps {@link String} as key and {@link BlockReader} as value
- *
- * @since 2.0.0
- */
-public class CacheManager
-{
- public static final int STRING_OVERHEAD = 64;
-
- public static final int BLOCK_READER_OVERHEAD = 368;
-
- public static final float DEFAULT_HEAP_MEMORY_PERCENTAGE = 0.25f;
-
- private static Cache<String, BlockReader> singleCache;
-
- private static boolean enableStats = false;
-
- public static final Cache<String, BlockReader> buildCache(CacheBuilder builder) {
- if (singleCache != null) {
- singleCache.cleanUp();
- }
- if (enableStats)
- builder.recordStats();
- singleCache = builder.build();
- return singleCache;
- }
-
- /**
- * (Re)Create the cache by limiting the maximum entries
- * @param concurrencyLevel
- * @param initialCapacity
- * @param maximunSize
- * @return The cache.
- */
- public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, int maximunSize){
- CacheBuilder builder = CacheBuilder.newBuilder().
- concurrencyLevel(concurrencyLevel).
- initialCapacity(initialCapacity).
- maximumSize(maximunSize);
-
- return buildCache(builder);
- }
-
-
- /**
- * (Re)Create the cache by limiting the memory(in bytes)
- * @param concurrencyLevel
- * @param initialCapacity
- * @param maximumMemory
- * @return The cache.
- */
- public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, long maximumMemory){
-
- CacheBuilder builder = CacheBuilder.newBuilder().
- concurrencyLevel(concurrencyLevel).
- initialCapacity(initialCapacity).
- maximumWeight(maximumMemory).weigher(new KVWeigher());
-
- return buildCache(builder);
- }
-
- /**
- * (Re)Create the cache by limiting percentage of the total heap memory
- * @param concurrencyLevel
- * @param initialCapacity
- * @param heapMemPercentage
- * @return The cache.
- */
- public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, float heapMemPercentage){
- CacheBuilder builder = CacheBuilder.newBuilder().
- concurrencyLevel(concurrencyLevel).
- initialCapacity(initialCapacity).
- maximumWeight((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * heapMemPercentage)).weigher(new KVWeigher());
- return buildCache(builder);
- }
-
- public static final void createDefaultCache(){
-
- long availableMemory = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * DEFAULT_HEAP_MEMORY_PERCENTAGE);
- CacheBuilder<String, BlockReader> builder = CacheBuilder.newBuilder().maximumWeight(availableMemory).weigher(new KVWeigher());
-
- singleCache = buildCache(builder);
- }
-
- public static final void put(String key, BlockReader blk){
- if (singleCache == null) {
- createDefaultCache();
- }
- singleCache.put(key, blk);
- }
-
- public static final BlockReader get(String key){
- if (singleCache == null) {
- return null;
- }
- return singleCache.getIfPresent(key);
- }
-
- public static final void invalidateKeys(Collection<String> keys)
- {
- if (singleCache != null)
- singleCache.invalidateAll(keys);
- }
-
- public static final long getCacheSize() {
- if (singleCache != null)
- return singleCache.size();
- return 0;
- }
-
- public static final class KVWeigher implements Weigher<String, BlockReader> {
-
- @Override
- public int weigh(String key, BlockReader value)
- {
- return (STRING_OVERHEAD + BLOCK_READER_OVERHEAD) +
- key.getBytes().length +
- value.getBlockDataInputStream().getBuf().length;
- }
-
- }
-
- @VisibleForTesting
- protected static Cache<String, BlockReader> getCache() {
- return singleCache;
- }
-
- public static final void setEnableStats(boolean enable) {
- enableStats = enable;
- }
-
- public static void main(String[] args)
- {
-
- //code to eitsmate the overhead of the instance of the key value objects
- // it depends on hbase file
-// System.out.println(ClassSize.estimateBase(BlockReader.class, true) +
-// ClassSize.estimateBase(Algorithm.class, true) +
-// ClassSize.estimateBase(RBlockState.class, true) +
-// ClassSize.estimateBase(ReusableByteArrayInputStream.class, true) +
-// ClassSize.estimateBase(BlockRegion.class, true));
-//
-// System.out.println(
-// ClassSize.estimateBase(String.class, true));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java b/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
deleted file mode 100644
index 779b0f0..0000000
--- a/contrib/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.io.file.tfile;
-
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.file.tfile.CompareUtils.Scalar;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
-import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarComparator;
-import org.apache.hadoop.io.file.tfile.CompareUtils.ScalarLong;
-import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
-
-
-/**
- *
- * <ul>
- * <li>The file format of DTFile is same as {@link TFile} with different reader implementation.
- * It reads data block by block and cache the binary block data into memory to speed up the random read.
- *
- * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation.
- * Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method
- * to expose raw block, key, value data to user to avoid unnecessary internal/external data copy
- *
- * <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory)
- * </ul>
- *
- * Block Compressed file, the underlying physical storage layer for TFile.
- * BCFile provides the basic block level compression for the data block and meta
- * blocks. It is separated from TFile as it may be used for other
- * block-compressed file implementation.
- *
- * @since 2.0.0
- */
-final class DTBCFile {
- // the current version of BCFile impl, increment them (major or minor) made
- // enough changes
- static final Version API_VERSION = new Version((short) 1, (short) 0);
- static final Log LOG = LogFactory.getLog(DTBCFile.class);
-
- /**
- * Prevent the instantiation of BCFile objects.
- */
- private DTBCFile() {
- // nothing
- }
-
- /**
- * BCFile writer, the entry point for creating a new BCFile.
- */
- static public class Writer implements Closeable {
- private final FSDataOutputStream out;
- private final Configuration conf;
- // the single meta block containing index of compressed data blocks
- final DataIndex dataIndex;
- // index for meta blocks
- final MetaIndex metaIndex;
- boolean blkInProgress = false;
- private boolean metaBlkSeen = false;
- private boolean closed = false;
- long errorCount = 0;
- // reusable buffers.
- private BytesWritable fsOutputBuffer;
-
- /**
- * Call-back interface to register a block after a block is closed.
- */
- private static interface BlockRegister {
- /**
- * Register a block that is fully closed.
- *
- * @param raw
- * The size of block in terms of uncompressed bytes.
- * @param offsetStart
- * The start offset of the block.
- * @param offsetEnd
- * One byte after the end of the block. Compressed block size is
- * offsetEnd - offsetStart.
- */
- public void register(long raw, long offsetStart, long offsetEnd);
- }
-
- /**
- * Intermediate class that maintain the state of a Writable Compression
- * Block.
- */
- private static final class WBlockState {
- private final Algorithm compressAlgo;
- private Compressor compressor; // !null only if using native
- // Hadoop compression
- private final FSDataOutputStream fsOut;
- private final long posStart;
- private final SimpleBufferedOutputStream fsBufferedOutput;
- private OutputStream out;
-
- /**
- * @param compressionAlgo
- * The compression algorithm to be used to for compression.
- * @throws IOException
- */
- public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut,
- BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
- this.compressAlgo = compressionAlgo;
- this.fsOut = fsOut;
- this.posStart = fsOut.getPos();
-
- fsOutputBuffer.setCapacity(DTFile.getFSOutputBufferSize(conf));
-
- this.fsBufferedOutput =
- new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
- this.compressor = compressAlgo.getCompressor();
-
- try {
- this.out =
- compressionAlgo.createCompressionStream(fsBufferedOutput,
- compressor, 0);
- } catch (IOException e) {
- compressAlgo.returnCompressor(compressor);
- throw e;
- }
- }
-
- /**
- * Get the output stream for BlockAppender's consumption.
- *
- * @return the output stream suitable for writing block data.
- */
- OutputStream getOutputStream() {
- return out;
- }
-
- /**
- * Get the current position in file.
- *
- * @return The current byte offset in underlying file.
- * @throws IOException
- */
- long getCurrentPos() throws IOException {
- return fsOut.getPos() + fsBufferedOutput.size();
- }
-
- long getStartPos() {
- return posStart;
- }
-
- /**
- * Current size of compressed data.
- *
- * @return
- * @throws IOException
- */
- long getCompressedSize() throws IOException {
- long ret = getCurrentPos() - posStart;
- return ret;
- }
-
- /**
- * Finishing up the current block.
- */
- public void finish() throws IOException {
- try {
- if (out != null) {
- out.flush();
- out = null;
- }
- } finally {
- compressAlgo.returnCompressor(compressor);
- compressor = null;
- }
- }
- }
-
- /**
- * Access point to stuff data into a block.
- *
- * TODO: Change DataOutputStream to something else that tracks the size as
- * long instead of int. Currently, we will wrap around if the row block size
- * is greater than 4GB.
- */
- public class BlockAppender extends DataOutputStream {
- private final BlockRegister blockRegister;
- private final WBlockState wBlkState;
- @SuppressWarnings("hiding")
- private boolean closed = false;
-
- /**
- * Constructor
- *
- * @param register
- * the block register, which is called when the block is closed.
- * @param wbs
- * The writable compression block state.
- */
- BlockAppender(BlockRegister register, WBlockState wbs) {
- super(wbs.getOutputStream());
- this.blockRegister = register;
- this.wBlkState = wbs;
- }
-
- /**
- * Get the raw size of the block.
- *
- * @return the number of uncompressed bytes written through the
- * BlockAppender so far.
- * @throws IOException
- */
- public long getRawSize() throws IOException {
- /**
- * Expecting the size() of a block not exceeding 4GB. Assuming the
- * size() will wrap to negative integer if it exceeds 2GB.
- */
- return size() & 0x00000000ffffffffL;
- }
-
- /**
- * Get the compressed size of the block in progress.
- *
- * @return the number of compressed bytes written to the underlying FS
- * file. The size may be smaller than actual need to compress the
- * all data written due to internal buffering inside the
- * compressor.
- * @throws IOException
- */
- public long getCompressedSize() throws IOException {
- return wBlkState.getCompressedSize();
- }
-
- @Override
- public void flush() {
- // The down stream is a special kind of stream that finishes a
- // compression block upon flush. So we disable flush() here.
- }
-
- /**
- * Signaling the end of write to the block. The block register will be
- * called for registering the finished block.
- */
- @Override
- public void close() throws IOException {
- if (closed == true) {
- return;
- }
- try {
- ++errorCount;
- wBlkState.finish();
- blockRegister.register(getRawSize(), wBlkState.getStartPos(),
- wBlkState.getCurrentPos());
- --errorCount;
- } finally {
- closed = true;
- blkInProgress = false;
- }
- }
- }
-
- /**
- * Constructor
- *
- * @param fout
- * FS output stream.
- * @param compressionName
- * Name of the compression algorithm, which will be used for all
- * data blocks.
- * @throws IOException
- * @see Compression#getSupportedAlgorithms
- */
- public Writer(FSDataOutputStream fout, String compressionName,
- Configuration conf) throws IOException {
- if (fout.getPos() != 0) {
- throw new IOException("Output file not at zero offset.");
- }
-
- this.out = fout;
- this.conf = conf;
- dataIndex = new DataIndex(compressionName);
- metaIndex = new MetaIndex();
- fsOutputBuffer = new BytesWritable();
- Magic.write(fout);
- }
-
- /**
- * Close the BCFile Writer. Attempting to use the Writer after calling
- * <code>close</code> is not allowed and may lead to undetermined results.
- */
- @Override
- public void close() throws IOException {
- if (closed == true) {
- return;
- }
-
- try {
- if (errorCount == 0) {
- if (blkInProgress == true) {
- throw new IllegalStateException(
- "Close() called with active block appender.");
- }
-
- // add metaBCFileIndex to metaIndex as the last meta block
- BlockAppender appender =
- prepareMetaBlock(DataIndex.BLOCK_NAME,
- getDefaultCompressionAlgorithm());
- try {
- dataIndex.write(appender);
- } finally {
- appender.close();
- }
-
- long offsetIndexMeta = out.getPos();
- metaIndex.write(out);
-
- // Meta Index and the trailing section are written out directly.
- out.writeLong(offsetIndexMeta);
-
- API_VERSION.write(out);
- Magic.write(out);
- out.flush();
- }
- } finally {
- closed = true;
- }
- }
-
- private Algorithm getDefaultCompressionAlgorithm() {
- return dataIndex.getDefaultCompressionAlgorithm();
- }
-
- private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
- throws IOException, MetaBlockAlreadyExists {
- if (blkInProgress == true) {
- throw new IllegalStateException(
- "Cannot create Meta Block until previous block is closed.");
- }
-
- if (metaIndex.getMetaByName(name) != null) {
- throw new MetaBlockAlreadyExists("name=" + name);
- }
-
- MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
- WBlockState wbs =
- new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
- BlockAppender ba = new BlockAppender(mbr, wbs);
- blkInProgress = true;
- metaBlkSeen = true;
- return ba;
- }
-
- /**
- * Create a Meta Block and obtain an output stream for adding data into the
- * block. There can only be one BlockAppender stream active at any time.
- * Regular Blocks may not be created after the first Meta Blocks. The caller
- * must call BlockAppender.close() to conclude the block creation.
- *
- * @param name
- * The name of the Meta Block. The name must not conflict with
- * existing Meta Blocks.
- * @param compressionName
- * The name of the compression algorithm to be used.
- * @return The BlockAppender stream
- * @throws IOException
- * @throws MetaBlockAlreadyExists
- * If the meta block with the name already exists.
- */
- public BlockAppender prepareMetaBlock(String name, String compressionName)
- throws IOException, MetaBlockAlreadyExists {
- return prepareMetaBlock(name, Compression
- .getCompressionAlgorithmByName(compressionName));
- }
-
- /**
- * Create a Meta Block and obtain an output stream for adding data into the
- * block. The Meta Block will be compressed with the same compression
- * algorithm as data blocks. There can only be one BlockAppender stream
- * active at any time. Regular Blocks may not be created after the first
- * Meta Blocks. The caller must call BlockAppender.close() to conclude the
- * block creation.
- *
- * @param name
- * The name of the Meta Block. The name must not conflict with
- * existing Meta Blocks.
- * @return The BlockAppender stream
- * @throws MetaBlockAlreadyExists
- * If the meta block with the name already exists.
- * @throws IOException
- */
- public BlockAppender prepareMetaBlock(String name) throws IOException,
- MetaBlockAlreadyExists {
- return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
- }
-
- /**
- * Create a Data Block and obtain an output stream for adding data into the
- * block. There can only be one BlockAppender stream active at any time.
- * Data Blocks may not be created after the first Meta Blocks. The caller
- * must call BlockAppender.close() to conclude the block creation.
- *
- * @return The BlockAppender stream
- * @throws IOException
- */
- public BlockAppender prepareDataBlock() throws IOException {
- if (blkInProgress == true) {
- throw new IllegalStateException(
- "Cannot create Data Block until previous block is closed.");
- }
-
- if (metaBlkSeen == true) {
- throw new IllegalStateException(
- "Cannot create Data Block after Meta Blocks.");
- }
-
- DataBlockRegister dbr = new DataBlockRegister();
-
- WBlockState wbs =
- new WBlockState(getDefaultCompressionAlgorithm(), out,
- fsOutputBuffer, conf);
- BlockAppender ba = new BlockAppender(dbr, wbs);
- blkInProgress = true;
- return ba;
- }
-
- /**
- * Callback to make sure a meta block is added to the internal list when its
- * stream is closed.
- */
- private class MetaBlockRegister implements BlockRegister {
- private final String name;
- private final Algorithm compressAlgo;
-
- MetaBlockRegister(String name, Algorithm compressAlgo) {
- this.name = name;
- this.compressAlgo = compressAlgo;
- }
-
- @Override
- public void register(long raw, long begin, long end) {
- metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
- new BlockRegion(begin, end - begin, raw)));
- }
- }
-
- /**
- * Callback to make sure a data block is added to the internal list when
- * it's being closed.
- *
- */
- private class DataBlockRegister implements BlockRegister {
- DataBlockRegister() {
- // do nothing
- }
-
- @Override
- public void register(long raw, long begin, long end) {
- dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
- }
- }
- }
-
- /**
- * BCFile Reader, interface to read the file's data and meta blocks.
- */
- static public class Reader implements Closeable {
- private final FSDataInputStream in;
- private final Configuration conf;
- final DataIndex dataIndex;
- // Index for meta blocks
- final MetaIndex metaIndex;
- final Version version;
- //
- private ByteArrayOutputStream baos;
- private ArrayList<String> cacheKeys;
-
- public ArrayList<String> getCacheKeys()
- {
- return cacheKeys;
- }
-
- /**
- * Intermediate class that maintain the state of a Readable Compression
- * Block.
- */
- static private final class RBlockState {
- private final Algorithm compressAlgo;
- private final ReusableByteArrayInputStream rbain;
- private final BlockRegion region;
-
- public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, Reader r) throws IOException
- {
- this.compressAlgo = compressionAlgo;
- Decompressor decompressor = compressionAlgo.getDecompressor();
- this.region = region;
- try {
-
- InputStream in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, region.getOffset(), region.getCompressedSize()), decompressor, DTFile.getFSInputBufferSize(conf));
- int l = 1;
- r.baos.reset();
- byte[] buf = new byte[DTFile.getFSInputBufferSize(conf)];
- while (l >= 0) {
- l = in.read(buf);
- if (l > 0) {
- r.baos.write(buf, 0, l);
- }
- }
- // keep decompressed data into cache
- byte[] blockData = r.baos.toByteArray();
- rbain = new ReusableByteArrayInputStream(blockData);
- } catch (IOException e) {
- compressAlgo.returnDecompressor(decompressor);
- throw e;
- }
-
- }
-
- /**
- * Get the output stream for BlockAppender's consumption.
- *
- * @return the output stream suitable for writing block data.
- */
- public ReusableByteArrayInputStream getInputStream() {
- return rbain;
- }
-
- public String getCompressionName() {
- return compressAlgo.getName();
- }
-
- public BlockRegion getBlockRegion() {
- return region;
- }
-
- public void finish() throws IOException {
- try {
- rbain.close();
- } finally {
- }
- }
-
- public void renew()
- {
- rbain.renew();
- }
- }
-
- /**
- * Access point to read a block.
- */
- public static class BlockReader extends DataInputStream {
- private final RBlockState rBlkState;
- private boolean closed = false;
-
- private ReusableByteArrayInputStream wrappedInputStream = null;
-
- BlockReader(RBlockState rbs) {
- super(rbs.getInputStream());
- rBlkState = rbs;
- wrappedInputStream = rbs.getInputStream();
- }
-
- /**
- * Finishing reading the block. Release all resources.
- */
- @Override
- public void close() throws IOException {
- if (closed == true) {
- return;
- }
- try {
- // Do not set rBlkState to null. People may access stats after calling
- // close().
- rBlkState.finish();
- } finally {
- closed = true;
- }
- }
-
- /**
- * Get the name of the compression algorithm used to compress the block.
- *
- * @return name of the compression algorithm.
- */
- public String getCompressionName() {
- return rBlkState.getCompressionName();
- }
-
- /**
- * Get the uncompressed size of the block.
- *
- * @return uncompressed size of the block.
- */
- public long getRawSize() {
- return rBlkState.getBlockRegion().getRawSize();
- }
-
- /**
- * Get the compressed size of the block.
- *
- * @return compressed size of the block.
- */
- public long getCompressedSize() {
- return rBlkState.getBlockRegion().getCompressedSize();
- }
-
- /**
- * Get the starting position of the block in the file.
- *
- * @return the starting position of the block in the file.
- */
- public long getStartPos() {
- return rBlkState.getBlockRegion().getOffset();
- }
-
- public void renew()
- {
- closed = false;
- rBlkState.renew();
- }
-
- public ReusableByteArrayInputStream getBlockDataInputStream()
- {
- return wrappedInputStream;
- }
- }
-
- /**
- * Constructor
- *
- * @param fin
- * FS input stream.
- * @param fileLength
- * Length of the corresponding file
- * @throws IOException
- */
- public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
- throws IOException {
- this.in = fin;
- this.conf = conf;
- // A reader buffer to read the block
- baos = new ByteArrayOutputStream(DTFile.getFSInputBufferSize(conf) * 2);
- this.cacheKeys = new ArrayList<String>();
- // move the cursor to the beginning of the tail, containing: offset to the
- // meta block index, version and magic
- fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
- / Byte.SIZE);
- long offsetIndexMeta = fin.readLong();
- version = new Version(fin);
- Magic.readAndVerify(fin);
-
- if (!version.compatibleWith(DTBCFile.API_VERSION)) {
- throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
- }
-
- // read meta index
- fin.seek(offsetIndexMeta);
- metaIndex = new MetaIndex(fin);
-
- // read data:BCFile.index, the data block index
- BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
- try {
- dataIndex = new DataIndex(blockR);
- } finally {
- blockR.close();
- }
- }
-
- /**
- * Get the name of the default compression algorithm.
- *
- * @return the name of the default compression algorithm.
- */
- public String getDefaultCompressionName() {
- return dataIndex.getDefaultCompressionAlgorithm().getName();
- }
-
- /**
- * Get version of BCFile file being read.
- *
- * @return version of BCFile file being read.
- */
- public Version getBCFileVersion() {
- return version;
- }
-
- /**
- * Get version of BCFile API.
- *
- * @return version of BCFile API.
- */
- public Version getAPIVersion() {
- return API_VERSION;
- }
-
- /**
- * Finishing reading the BCFile. Release all resources.
- */
- @Override
- public void close() {
- // Delete buffers in cache for this reader.
- CacheManager.invalidateKeys(cacheKeys);
- cacheKeys.clear();
- }
-
- /**
- * Get the number of data blocks.
- *
- * @return the number of data blocks.
- */
- public int getBlockCount() {
- return dataIndex.getBlockRegionList().size();
- }
-
- /**
- * Stream access to a Meta Block.
- *
- * @param name
- * meta block name
- * @return BlockReader input stream for reading the meta block.
- * @throws IOException
- * @throws MetaBlockDoesNotExist
- * The Meta Block with the given name does not exist.
- */
- public BlockReader getMetaBlock(String name) throws IOException,
- MetaBlockDoesNotExist {
- MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
- if (imeBCIndex == null) {
- throw new MetaBlockDoesNotExist("name=" + name);
- }
-
- BlockRegion region = imeBCIndex.getRegion();
- return createReader(imeBCIndex.getCompressionAlgorithm(), region);
- }
-
- /**
- * Stream access to a Data Block.
- *
- * @param blockIndex
- * 0-based data block index.
- * @return BlockReader input stream for reading the data block.
- * @throws IOException
- */
- public BlockReader getDataBlock(int blockIndex) throws IOException {
- if (blockIndex < 0 || blockIndex >= getBlockCount()) {
- throw new IndexOutOfBoundsException(String.format(
- "blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
- }
-
- BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
- return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
- }
-
- private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
- throws IOException {
- BlockReader br = (BlockReader) CacheManager.get(region.getOffset() + this.toString());
- if(br==null){
- RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, this);
- br = new BlockReader(rbs);
- String cacheKey = region.getOffset() + this.toString();
- CacheManager.put(cacheKey, br);
- cacheKeys.add(cacheKey);
- } else {
- br.renew();
- }
- return br;
- }
-
- /**
- * Find the smallest Block index whose starting offset is greater than or
- * equal to the specified offset.
- *
- * @param offset
- * User-specific offset.
- * @return the index to the data Block if such block exists; or -1
- * otherwise.
- */
- public int getBlockIndexNear(long offset) {
- ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
- int idx =
- Utils
- .lowerBound(list, new ScalarLong(offset), new ScalarComparator());
-
- if (idx == list.size()) {
- return -1;
- }
-
- return idx;
- }
- }
-
- /**
- * Index for all Meta blocks.
- */
- static class MetaIndex {
- // use a tree map, for getting a meta block entry by name
- final Map<String, MetaIndexEntry> index;
-
- // for write
- public MetaIndex() {
- index = new TreeMap<String, MetaIndexEntry>();
- }
-
- // for read, construct the map from the file
- public MetaIndex(DataInput in) throws IOException {
- int count = Utils.readVInt(in);
- index = new TreeMap<String, MetaIndexEntry>();
-
- for (int nx = 0; nx < count; nx++) {
- MetaIndexEntry indexEntry = new MetaIndexEntry(in);
- index.put(indexEntry.getMetaName(), indexEntry);
- }
- }
-
- public void addEntry(MetaIndexEntry indexEntry) {
- index.put(indexEntry.getMetaName(), indexEntry);
- }
-
- public MetaIndexEntry getMetaByName(String name) {
- return index.get(name);
- }
-
- public void write(DataOutput out) throws IOException {
- Utils.writeVInt(out, index.size());
-
- for (MetaIndexEntry indexEntry : index.values()) {
- indexEntry.write(out);
- }
- }
- }
-
- /**
- * An entry describes a meta block in the MetaIndex.
- */
- static final class MetaIndexEntry {
- private final String metaName;
- private final Algorithm compressionAlgorithm;
- private final static String defaultPrefix = "data:";
-
- private final BlockRegion region;
-
- public MetaIndexEntry(DataInput in) throws IOException {
- String fullMetaName = Utils.readString(in);
- if (fullMetaName.startsWith(defaultPrefix)) {
- metaName =
- fullMetaName.substring(defaultPrefix.length(), fullMetaName
- .length());
- } else {
- throw new IOException("Corrupted Meta region Index");
- }
-
- compressionAlgorithm =
- Compression.getCompressionAlgorithmByName(Utils.readString(in));
- region = new BlockRegion(in);
- }
-
- public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm,
- BlockRegion region) {
- this.metaName = metaName;
- this.compressionAlgorithm = compressionAlgorithm;
- this.region = region;
- }
-
- public String getMetaName() {
- return metaName;
- }
-
- public Algorithm getCompressionAlgorithm() {
- return compressionAlgorithm;
- }
-
- public BlockRegion getRegion() {
- return region;
- }
-
- public void write(DataOutput out) throws IOException {
- Utils.writeString(out, defaultPrefix + metaName);
- Utils.writeString(out, compressionAlgorithm.getName());
-
- region.write(out);
- }
- }
-
- /**
- * Index of all compressed data blocks.
- */
- static class DataIndex {
- final static String BLOCK_NAME = "BCFile.index";
-
- private final Algorithm defaultCompressionAlgorithm;
-
- // for data blocks, each entry specifies a block's offset, compressed size
- // and raw size
- private final ArrayList<BlockRegion> listRegions;
-
- // for read, deserialized from a file
- public DataIndex(DataInput in) throws IOException {
- defaultCompressionAlgorithm =
- Compression.getCompressionAlgorithmByName(Utils.readString(in));
-
- int n = Utils.readVInt(in);
- listRegions = new ArrayList<BlockRegion>(n);
-
- for (int i = 0; i < n; i++) {
- BlockRegion region = new BlockRegion(in);
- listRegions.add(region);
- }
- }
-
- // for write
- public DataIndex(String defaultCompressionAlgorithmName) {
- this.defaultCompressionAlgorithm =
- Compression
- .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
- listRegions = new ArrayList<BlockRegion>();
- }
-
- public Algorithm getDefaultCompressionAlgorithm() {
- return defaultCompressionAlgorithm;
- }
-
- public ArrayList<BlockRegion> getBlockRegionList() {
- return listRegions;
- }
-
- public void addBlockRegion(BlockRegion region) {
- listRegions.add(region);
- }
-
- public void write(DataOutput out) throws IOException {
- Utils.writeString(out, defaultCompressionAlgorithm.getName());
-
- Utils.writeVInt(out, listRegions.size());
-
- for (BlockRegion region : listRegions) {
- region.write(out);
- }
- }
- }
-
- /**
- * Magic number uniquely identifying a BCFile in the header/footer.
- */
- static final class Magic {
- private final static byte[] AB_MAGIC_BCFILE =
- {
- // ... total of 16 bytes
- (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91,
- (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf,
- (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1,
- (byte) 0x50 };
-
- public static void readAndVerify(DataInput in) throws IOException {
- byte[] abMagic = new byte[size()];
- in.readFully(abMagic);
-
- // check against AB_MAGIC_BCFILE, if not matching, throw an
- // Exception
- if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
- throw new IOException("Not a valid BCFile.");
- }
- }
-
- public static void write(DataOutput out) throws IOException {
- out.write(AB_MAGIC_BCFILE);
- }
-
- public static int size() {
- return AB_MAGIC_BCFILE.length;
- }
- }
-
- /**
- * Block region.
- */
- static final class BlockRegion implements Scalar {
- private final long offset;
- private final long compressedSize;
- private final long rawSize;
-
- public BlockRegion(DataInput in) throws IOException {
- offset = Utils.readVLong(in);
- compressedSize = Utils.readVLong(in);
- rawSize = Utils.readVLong(in);
- }
-
- public BlockRegion(long offset, long compressedSize, long rawSize) {
- this.offset = offset;
- this.compressedSize = compressedSize;
- this.rawSize = rawSize;
- }
-
- public void write(DataOutput out) throws IOException {
- Utils.writeVLong(out, offset);
- Utils.writeVLong(out, compressedSize);
- Utils.writeVLong(out, rawSize);
- }
-
- public long getOffset() {
- return offset;
- }
-
- public long getCompressedSize() {
- return compressedSize;
- }
-
- public long getRawSize() {
- return rawSize;
- }
-
- @Override
- public long magnitude() {
- return offset;
- }
- }
-}