You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/11/21 07:53:49 UTC
[2/3] TAJO-200: RCFile compatible to apache hive. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/effa7df6/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 245aba5..74e2192 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -18,6 +18,8 @@
package org.apache.tajo.storage.rcfile;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,9 +28,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import java.io.*;
import java.rmi.server.UID;
@@ -39,21 +48,21 @@ import java.util.Arrays;
* <code>RCFile</code>s, short of Record Columnar File, are flat files
* consisting of binary key/value pairs, which shares much similarity with
* <code>SequenceFile</code>.
- *
+ * <p/>
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
* partitions each row split in a columnar way. RCFile first stores the meta
* data of a row split, as the key part of a record, and all the data of a row
- * split as the value part. When writing, RCFile.Appender first holds records'
+ * split as the value part. When writing, RCFile.Writer first holds records'
* value bytes in memory, and determines a row split if the raw bytes size of
- * buffered records overflow a given parameter<tt>Appender.columnsBufferSize</tt>,
+ * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
* which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
- 4 * 1024 * 1024)</code> .
+ * 4 * 1024 * 1024)</code> .
* <p>
* <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
* writing, reading respectively.
* </p>
- *
+ * <p/>
* <p>
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
@@ -61,21 +70,21 @@ import java.util.Arrays;
* data of a row split, as the key part of a record, and all the data of a row
* split as the value part.
* </p>
- *
+ * <p/>
* <p>
* RCFile compresses values in a more fine-grained manner then record level
* compression. However, It currently does not support compress the key part
* yet. The actual compression algorithm used to compress key and/or values can
* be specified by using the appropriate {@link CompressionCodec}.
* </p>
- *
+ * <p/>
* <p>
* The {@link Reader} is used to read and explain the bytes of RCFile.
* </p>
- *
+ * <p/>
* <h4 id="Formats">RCFile Formats</h4>
- *
- *
+ * <p/>
+ * <p/>
* <h5 id="Header">RC Header</h5>
* <ul>
* <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
@@ -87,7 +96,7 @@ import java.util.Arrays;
* <li>metadata - {@link Metadata} for this file.</li>
* <li>sync - A sync marker to denote end of the header.</li>
* </ul>
- *
+ * <p/>
* <h5>RCFile Format</h5>
* <ul>
* <li><a href="#Header">Header</a></li>
@@ -117,26 +126,208 @@ import java.util.Arrays;
* </ul>
* </li>
* </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
+ *
+ * rcfile ::=
+ * <file-header>
+ * <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ * <file-version-header>
+ * <file-key-class-name> (only exists if version is seq6)
+ * <file-value-class-name> (only exists if version is seq6)
+ * <file-is-compressed>
+ * <file-is-block-compressed> (only exists if version is seq6)
+ * [<file-compression-codec-class>]
+ * <file-header-metadata>
+ * <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header. A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ * | Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ * <rowgroup-header>
+ * <rowgroup-key-data>
+ * <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ * [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ * <rowgroup-record-length>
+ * <rowgroup-key-length>
+ * <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ * <rowgroup-num-rows>
+ * <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ * <column-buffer-length>
+ * <column-buffer-uncompressed-length>
+ * <column-key-buffer-length>
+ * <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
*
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
*/
public class RCFile {
private static final Log LOG = LogFactory.getLog(RCFile.class);
public static final String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
-
public static final String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
- public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
-
- public static final String TOLERATE_CORRUPTIONS_CONF_STR =
- "hive.io.rcfile.tolerate.corruptions";
-
- // HACK: We actually need BlockMissingException, but that is not available
- // in all hadoop versions.
- public static final String BLOCK_MISSING_MESSAGE =
- "Could not obtain block";
-
// All of the versions should be place in this list.
private static final int ORIGINAL_VERSION = 0; // version with SEQ
private static final int NEW_MAGIC_VERSION = 1; // version with RCF
@@ -144,31 +335,35 @@ public class RCFile {
private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
// The first version of RCFile used the sequence file header.
- private static final byte[] ORIGINAL_MAGIC = new byte[] {
+ private static final byte[] ORIGINAL_MAGIC = new byte[]{
(byte) 'S', (byte) 'E', (byte) 'Q'};
// the version that was included with the original magic, which is mapped
// into ORIGINAL_VERSION
private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
- private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[] {
+ private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[]{
(byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
};
// The 'magic' bytes at the beginning of the RCFile
- private static final byte[] MAGIC = new byte[] {
+ private static final byte[] MAGIC = new byte[]{
(byte) 'R', (byte) 'C', (byte) 'F'};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
- /** The number of bytes between sync points. */
+ /**
+ * The number of bytes between sync points.
+ */
public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+ public static final String NULL = "rcfile.null";
+ public static final String SERDE = "rcfile.serde";
/**
* KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
* below:
- *
+ * <p/>
* <ul>
* <li>record length in bytes,it is the sum of bytes used to store the key
* part and the value part.</li>
@@ -185,62 +380,28 @@ public class RCFile {
* <li>{the end of the key part}</li>
* </ul>
*/
- public static class KeyBuffer implements WritableComparable {
+ public static class KeyBuffer {
// each column's length in the value
private int[] eachColumnValueLen = null;
private int[] eachColumnUncompressedValueLen = null;
// stores each cell's length of a column in one DataOutputBuffer element
- private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
+ private NonSyncByteArrayOutputStream[] allCellValLenBuffer = null;
// how many rows in this split
private int numberRows = 0;
// how many columns
private int columnNumber = 0;
- // return the number of columns recorded in this file's header
- public int getColumnNumber() {
- return columnNumber;
- }
-
- @SuppressWarnings("unused")
- @Deprecated
- public KeyBuffer(){
- }
-
KeyBuffer(int columnNum) {
columnNumber = columnNum;
eachColumnValueLen = new int[columnNumber];
eachColumnUncompressedValueLen = new int[columnNumber];
- allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
- }
-
- @SuppressWarnings("unused")
- @Deprecated
- KeyBuffer(int numberRows, int columnNum) {
- this(columnNum);
- this.numberRows = numberRows;
+ allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
}
- /**
- * add in a new column's meta data.
- *
- * @param columnValueLen
- * this total bytes number of this column's values in this split
- * @param colValLenBuffer
- * each cell's length of this column's in this split
- */
- void setColumnLenInfo(int columnValueLen,
- NonSyncDataOutputBuffer colValLenBuffer,
- int columnUncompressedValueLen, int columnIndex) {
- eachColumnValueLen[columnIndex] = columnValueLen;
- eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
- allCellValLenBuffer[columnIndex] = colValLenBuffer;
- }
-
- @Override
public void readFields(DataInput in) throws IOException {
eachColumnValueLen = new int[columnNumber];
eachColumnUncompressedValueLen = new int[columnNumber];
- allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+ allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
numberRows = WritableUtils.readVInt(in);
for (int i = 0; i < columnNumber; i++) {
@@ -248,7 +409,7 @@ public class RCFile {
eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
int bufLen = WritableUtils.readVInt(in);
if (allCellValLenBuffer[i] == null) {
- allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
+ allCellValLenBuffer[i] = new NonSyncByteArrayOutputStream();
} else {
allCellValLenBuffer[i].reset();
}
@@ -256,43 +417,11 @@ public class RCFile {
}
}
- @Override
- public void write(DataOutput out) throws IOException {
- // out.writeInt(numberRows);
- WritableUtils.writeVLong(out, numberRows);
- for (int i = 0; i < eachColumnValueLen.length; i++) {
- WritableUtils.writeVLong(out, eachColumnValueLen[i]);
- WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
- NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
- int bufLen = colRowsLenBuf.getLength();
- WritableUtils.writeVLong(out, bufLen);
- out.write(colRowsLenBuf.getData(), 0, bufLen);
- }
- }
-
/**
- * get number of bytes to store the keyBuffer.
- *
- * @return number of bytes used to store this KeyBuffer on disk
- * @throws IOException
+ * @return the numberRows
*/
- public int getSize() throws IOException {
- int ret = 0;
- ret += WritableUtils.getVIntSize(numberRows);
- for (int i = 0; i < eachColumnValueLen.length; i++) {
- ret += WritableUtils.getVIntSize(eachColumnValueLen[i]);
- ret += WritableUtils.getVIntSize(eachColumnUncompressedValueLen[i]);
- ret += WritableUtils.getVIntSize(allCellValLenBuffer[i].getLength());
- ret += allCellValLenBuffer[i].getLength();
- }
-
- return ret;
- }
-
- @Override
- public int compareTo(Object arg0) {
- throw new RuntimeException("compareTo not supported in class "
- + this.getClass().getName());
+ public int getNumberRows() {
+ return numberRows;
}
}
@@ -306,53 +435,10 @@ public class RCFile {
* column_2_row_2_value,....]</li>
* </ul>
*/
- public static class ValueBuffer implements WritableComparable {
-
- class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
-
- int index = -1;
- int colIndex = -1;
-
- public LazyDecompressionCallbackImpl(int index, int colIndex) {
- super();
- this.index = index;
- this.colIndex = colIndex;
- }
-
- @Override
- public byte[] decompress() throws IOException {
-
- if (decompressedFlag[index] || codec == null) {
- return loadedColumnsValueBuffer[index].getData();
- }
-
- NonSyncDataOutputBuffer compressedData = compressedColumnsValueBuffer[index];
- decompressBuffer.reset();
- DataInputStream valueIn = new DataInputStream(deflatFilter);
- deflatFilter.resetState();
- if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
- ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
- }
- decompressBuffer.reset(compressedData.getData(),
- keyBuffer.eachColumnValueLen[colIndex]);
-
- NonSyncDataOutputBuffer decompressedColBuf = loadedColumnsValueBuffer[index];
- decompressedColBuf.reset();
- decompressedColBuf.write(valueIn,
- keyBuffer.eachColumnUncompressedValueLen[colIndex]);
- decompressedFlag[index] = true;
- numCompressed--;
- return decompressedColBuf.getData();
- }
- }
+ public static class ValueBuffer {
// used to load columns' value into memory
- private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
- private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
- private boolean[] decompressedFlag = null;
- private int numCompressed;
- private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
- private boolean lazyDecompress = true;
+ private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
boolean inited = false;
@@ -364,104 +450,33 @@ public class RCFile {
boolean[] skippedColIDs = null;
CompressionCodec codec;
-
- Decompressor valDecompressor = null;
+ Decompressor decompressor = null;
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
- CompressionInputStream deflatFilter = null;
-
- @SuppressWarnings("unused")
- @Deprecated
- public ValueBuffer() throws IOException {
- }
-
- @SuppressWarnings("unused")
- @Deprecated
- public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
- this(keyBuffer, keyBuffer.columnNumber, null, null, true);
- }
- @SuppressWarnings("unused")
- @Deprecated
- public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
- throws IOException {
- this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
- }
- @SuppressWarnings("unused")
- @Deprecated
public ValueBuffer(KeyBuffer currentKey, int columnNumber,
- boolean[] skippedCols, CompressionCodec codec) throws IOException {
- this(currentKey, columnNumber, skippedCols, codec, true);
- }
-
- public ValueBuffer(KeyBuffer currentKey, int columnNumber,
- boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+ int[] targets, CompressionCodec codec, boolean[] skippedIDs)
throws IOException {
- this.lazyDecompress = lazyDecompress;
keyBuffer = currentKey;
this.columnNumber = columnNumber;
-
- if (skippedCols != null && skippedCols.length > 0) {
- skippedColIDs = skippedCols;
- } else {
- skippedColIDs = new boolean[columnNumber];
- for (int i = 0; i < skippedColIDs.length; i++) {
- skippedColIDs[i] = false;
- }
- }
-
- int skipped = 0;
- for (boolean currentSkip : skippedColIDs) {
- if (currentSkip) {
- skipped++;
- }
- }
- loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
- - skipped];
- decompressedFlag = new boolean[columnNumber - skipped];
- lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
- - skipped];
- compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
- - skipped];
+ this.skippedColIDs = skippedIDs;
this.codec = codec;
+ loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
+
if (codec != null) {
- valDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
- deflatFilter = codec.createInputStream(decompressBuffer,
- valDecompressor);
- }
- if (codec != null) {
- numCompressed = decompressedFlag.length;
- } else {
- numCompressed = 0;
- }
- for (int k = 0, readIndex = 0; k < columnNumber; k++) {
- if (skippedColIDs[k]) {
- continue;
- }
- loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
- if (codec != null) {
- decompressedFlag[readIndex] = false;
- lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
- readIndex, k);
- compressedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
- } else {
- decompressedFlag[readIndex] = true;
- }
- readIndex++;
+ decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
}
- }
- @SuppressWarnings("unused")
- @Deprecated
- public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
- int addIndex) {
- loadedColumnsValueBuffer[addIndex] = valBuffer;
+ for (int i = 0; i < targets.length; i++) {
+ loadedColumnsValueBuffer[i] = new NonSyncByteArrayOutputStream();
+ }
}
- @Override
public void readFields(DataInput in) throws IOException {
int addIndex = 0;
int skipTotal = 0;
+
+
for (int i = 0; i < columnNumber; i++) {
int vaRowsLen = keyBuffer.eachColumnValueLen[i];
// skip this column
@@ -475,65 +490,61 @@ public class RCFile {
skipTotal = 0;
}
- NonSyncDataOutputBuffer valBuf;
- if (codec != null){
+ NonSyncByteArrayOutputStream valBuf;
+ if (codec != null) {
// load into compressed buf first
- valBuf = compressedColumnsValueBuffer[addIndex];
+
+ byte[] compressedBytes = new byte[vaRowsLen];
+ in.readFully(compressedBytes, 0, vaRowsLen);
+
+ decompressBuffer.reset(compressedBytes, vaRowsLen);
+ if(decompressor != null) decompressor.reset();
+
+ DataInputStream is;
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+ decompressBuffer, decompressor, 0, vaRowsLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ is = new DataInputStream(deflatFilter);
+ } else {
+ CompressionInputStream deflatFilter = codec.createInputStream(decompressBuffer, decompressor);
+ is = new DataInputStream(deflatFilter);
+ }
+
+ valBuf = loadedColumnsValueBuffer[addIndex];
+ valBuf.reset();
+ valBuf.write(is, keyBuffer.eachColumnUncompressedValueLen[i]);
+ is.close();
+ decompressBuffer.close();
} else {
valBuf = loadedColumnsValueBuffer[addIndex];
- }
- valBuf.reset();
- valBuf.write(in, vaRowsLen);
- if (codec != null) {
- decompressedFlag[addIndex] = false;
- if (!lazyDecompress) {
- lazyDecompressCallbackObjs[addIndex].decompress();
- decompressedFlag[addIndex] = true;
- }
+ valBuf.reset();
+ valBuf.write(in, vaRowsLen);
}
addIndex++;
}
- if (codec != null) {
- numCompressed = decompressedFlag.length;
- }
if (skipTotal != 0) {
in.skipBytes(skipTotal);
}
}
- @Override
- public void write(DataOutput out) throws IOException {
- if (codec != null) {
- for (NonSyncDataOutputBuffer currentBuf : compressedColumnsValueBuffer) {
- out.write(currentBuf.getData(), 0, currentBuf.getLength());
- }
- } else {
- for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
- out.write(currentBuf.getData(), 0, currentBuf.getLength());
- }
- }
- }
-
public void clearColumnBuffer() throws IOException {
decompressBuffer.reset();
}
public void close() {
- for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+ for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
IOUtils.closeStream(element);
}
if (codec != null) {
IOUtils.closeStream(decompressBuffer);
- org.apache.tajo.storage.compress.CodecPool.returnDecompressor(valDecompressor);
+ if (decompressor != null) {
+ // Make sure we only return decompressor once.
+ org.apache.tajo.storage.compress.CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
}
}
-
- @Override
- public int compareTo(Object arg0) {
- throw new RuntimeException("compareTo not supported in class "
- + this.getClass().getName());
- }
}
/**
@@ -543,12 +554,12 @@ public class RCFile {
public static Metadata createMetadata(Text... values) {
if (values.length % 2 != 0) {
throw new IllegalArgumentException("Must have a matched set of " +
- "key-value pairs. " + values.length+
+ "key-value pairs. " + values.length +
" strings supplied.");
}
Metadata result = new Metadata();
- for(int i=0; i < values.length; i += 2) {
- result.set(values[i], values[i+1]);
+ for (int i = 0; i < values.length; i += 2) {
+ result.set(values[i], values[i + 1]);
}
return result;
}
@@ -556,21 +567,37 @@ public class RCFile {
/**
* Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
* compatible with SequenceFile's.
- *
*/
- public static class Writer {
-
+ public static class RCFileAppender extends FileAppender {
Configuration conf;
FSDataOutputStream out;
CompressionCodec codec = null;
Metadata metadata = null;
+ FileSystem fs = null;
+ TableStatistics stats = null;
+ int columnNumber = 0;
+
+ // how many records the writer buffers before it writes to disk
+ private int RECORD_INTERVAL = Integer.MAX_VALUE;
+ // the max size of memory for buffering records before writes them out
+ private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 4M
+ // the conf string for COLUMNS_BUFFER_SIZE
+ public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+
+ // how many records already buffered
+ private int bufferedRecords = 0;
+ private ColumnBuffer[] columnBuffers = null;
+ boolean useNewMagic = true;
+ private byte[] nullChars;
+ SerializeDeserialize serde;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
long lastSyncPos; // position of last sync
byte[] sync; // 16 random bytes
+
{
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -582,36 +609,14 @@ public class RCFile {
}
}
- // how many records the writer buffers before it writes to disk
- private int RECORD_INTERVAL = Integer.MAX_VALUE;
- // the max size of memory for buffering records before writes them out
- private int columnsBufferSize = 4 * 1024 * 1024; // 4M
- // the conf string for COLUMNS_BUFFER_SIZE
- public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
-
- // how many records already buffered
- private int bufferedRecords = 0;
-
- private final ColumnBuffer[] columnBuffers;
-
- private int columnNumber = 0;
-
- private final int[] columnValuePlainLength;
-
- KeyBuffer key = null;
- private final int[] plainTotalColumnLength;
- private final int[] comprTotalColumnLength;
-
- boolean useNewMagic = true;
-
/*
* used for buffering appends before flush them out
*/
class ColumnBuffer {
// used for buffer a column's values
- NonSyncDataOutputBuffer columnValBuffer;
+ NonSyncByteArrayOutputStream columnValBuffer;
// used to store each value's length
- NonSyncDataOutputBuffer valLenBuffer;
+ NonSyncByteArrayOutputStream valLenBuffer;
/*
* use a run-length encoding. We only record run length if a same
@@ -620,21 +625,25 @@ public class RCFile {
* example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And for
* value lengths 1,2,3 we record 1,2,3.
*/
+ int columnValueLength = 0;
+ int uncompressedColumnValueLength = 0;
+ int columnKeyLength = 0;
int runLength = 0;
int prevValueLength = -1;
ColumnBuffer() throws IOException {
- columnValBuffer = new NonSyncDataOutputBuffer();
- valLenBuffer = new NonSyncDataOutputBuffer();
+ columnValBuffer = new NonSyncByteArrayOutputStream();
+ valLenBuffer = new NonSyncByteArrayOutputStream();
}
- public void append(BytesRefWritable data) throws IOException {
- data.writeDataTo(columnValBuffer);
- int currentLen = data.getLength();
+ public int append(Column column, Datum datum) throws IOException {
+ int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
+ columnValueLength += currentLen;
+ uncompressedColumnValueLength += currentLen;
if (prevValueLength < 0) {
startNewGroup(currentLen);
- return;
+ return currentLen;
}
if (currentLen != prevValueLength) {
@@ -643,6 +652,7 @@ public class RCFile {
} else {
runLength++;
}
+ return currentLen;
}
private void startNewGroup(int currentLen) {
@@ -650,22 +660,39 @@ public class RCFile {
runLength = 0;
}
- public void clear() throws IOException {
+ public void clear() {
valLenBuffer.reset();
columnValBuffer.reset();
prevValueLength = -1;
runLength = 0;
+ columnValueLength = 0;
+ columnKeyLength = 0;
+ uncompressedColumnValueLength = 0;
}
- public void flushGroup() throws IOException {
+ public int flushGroup() {
+ int len = 0;
if (prevValueLength >= 0) {
- WritableUtils.writeVLong(valLenBuffer, prevValueLength);
+ len += valLenBuffer.writeVLong(prevValueLength);
if (runLength > 0) {
- WritableUtils.writeVLong(valLenBuffer, ~runLength);
+ len += valLenBuffer.writeVLong(~runLength);
}
+ columnKeyLength += len;
runLength = -1;
prevValueLength = -1;
}
+ return len;
+ }
+
+ public int UnFlushedGroupSize() {
+ int len = 0;
+ if (prevValueLength >= 0) {
+ len += WritableUtils.getVIntSize(prevValueLength);
+ if (runLength > 0) {
+ len += WritableUtils.getVIntSize(~runLength);
+ }
+ }
+ return len;
}
}
@@ -673,98 +700,76 @@ public class RCFile {
return out.getPos();
}
- /** Constructs a RCFile Appender. */
- public Writer(FileSystem fs, Configuration conf, Path name) throws IOException {
- this(fs, conf, name, null, new Metadata(), null);
- }
+ public RCFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
+ super(conf, schema, meta, path);
- /**
- * Constructs a RCFile Appender.
- *
- * @param fs
- * the file system used
- * @param conf
- * the configuration file
- * @param name
- * the file name
- * @throws IOException
- */
- public Writer(FileSystem fs, Configuration conf, Path name,
- Progressable progress, CompressionCodec codec) throws IOException {
- this(fs, conf, name, progress, new Metadata(), codec);
+ this.conf = conf;
+ this.fs = path.getFileSystem(conf);
+ RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+ COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE);
+ columnNumber = schema.getColumnNum();
}
- /**
- * Constructs a RCFile Appender.
- *
- * @param fs
- * the file system used
- * @param conf
- * the configuration file
- * @param name
- * the file name
- * @param progress a progress meter to update as the file is written
- * @param metadata a string to string map in the file header
- * @throws IOException
- */
- public Writer(FileSystem fs, Configuration conf, Path name,
- Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException {
- this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096),
- fs.getDefaultReplication(), fs.getDefaultBlockSize(), progress,
- metadata, codec);
- }
+ public void init() throws IOException {
+ fs = path.getFileSystem(conf);
- /**
- *
- * Constructs a RCFile Appender.
- *
- * @param fs
- * the file system used
- * @param conf
- * the configuration file
- * @param name
- * the file name
- * @param bufferSize the size of the file buffer
- * @param replication the number of replicas for the file
- * @param blockSize the block size of the file
- * @param progress the progress meter for writing the file
- * @param metadata a string to string map in the file header
- * @throws IOException
- */
- public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
- short replication, long blockSize, Progressable progress,
- Metadata metadata, CompressionCodec codec) throws IOException {
- RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
- columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ String codecClassname = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
+ if (!StringUtils.isEmpty(codecClassname)) {
+ try {
+ Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+ codecClassname).asSubclass(CompressionCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException(
+ "Unknown codec: " + codecClassname, cnfe);
+ }
+ }
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
if (metadata == null) {
metadata = new Metadata();
}
- metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
- + columnNumber));
- columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
- 4 * 1024 * 1024);
+ metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
- columnValuePlainLength = new int[columnNumber];
+ String serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+ try {
+ serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+ metadata.set(new Text(SERDE), new Text(serdeClass));
columnBuffers = new ColumnBuffer[columnNumber];
for (int i = 0; i < columnNumber; i++) {
columnBuffers[i] = new ColumnBuffer();
}
- init(conf, fs.create(name, true, bufferSize, replication,
- blockSize, progress), codec, metadata);
+ init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
- key = new KeyBuffer(columnNumber);
- plainTotalColumnLength = new int[columnNumber];
- comprTotalColumnLength = new int[columnNumber];
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+ super.init();
}
- /** Write the initial part of file header. */
+ /**
+ * Write the initial part of file header.
+ */
void initializeFileHeader() throws IOException {
if (useNewMagic) {
out.write(MAGIC);
@@ -774,7 +779,9 @@ public class RCFile {
}
}
- /** Write the final part of file header. */
+ /**
+ * Write the final part of file header.
+ */
void finalizeFileHeader() throws IOException {
out.write(sync); // write the sync bytes
out.flush(); // flush header
@@ -784,13 +791,15 @@ public class RCFile {
return codec != null;
}
- /** Write and flush the file header. */
+ /**
+ * Write and flush the file header.
+ */
void writeFileHeader() throws IOException {
if (useNewMagic) {
out.writeBoolean(isCompressed());
} else {
- Text.writeString(out, KeyBuffer.class.getName());
- Text.writeString(out, ValueBuffer.class.getName());
+ Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer");
+ Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer");
out.writeBoolean(isCompressed());
out.writeBoolean(false);
}
@@ -807,18 +816,12 @@ public class RCFile {
this.out = out;
this.codec = codec;
this.metadata = metadata;
- this.useNewMagic =
- conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
- }
-
- /** Returns the compression codec of data in this file. */
- @SuppressWarnings("unused")
- @Deprecated
- public CompressionCodec getCompressionCodec() {
- return codec;
+ this.useNewMagic = conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
}
- /** create a sync point. */
+ /**
+ * create a sync point.
+ */
public void sync() throws IOException {
if (sync != null && lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE); // mark the start of the sync
@@ -827,13 +830,6 @@ public class RCFile {
}
}
- /** Returns the configuration of this file. */
- @SuppressWarnings("unused")
- @Deprecated
- Configuration getConf() {
- return conf;
- }
-
private void checkAndWriteSync() throws IOException {
if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
sync();
@@ -842,115 +838,191 @@ public class RCFile {
private int columnBufferSize = 0;
+ @Override
+ public long getOffset() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushRecords();
+ out.flush();
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ append(t);
+ // Statistical section
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
/**
* Append a row of values. Currently it only can accept <
- * {@link BytesRefArrayWritable}. If its <code>size()</code> is less than the
+ * {@link Tuple}. If its <code>size()</code> is less than the
* column number in the file, zero bytes are appended for the empty columns.
* If its size() is greater then the column number in the file, the exceeded
* columns' bytes are ignored.
*
- * @param val a BytesRefArrayWritable with the list of serialized columns
+ * @param tuple a Tuple with the list of serialized columns
* @throws IOException
*/
- public void append(Writable val) throws IOException {
+ public void append(Tuple tuple) throws IOException {
+ int size = schema.getColumnNum();
- if (!(val instanceof BytesRefArrayWritable)) {
- throw new UnsupportedOperationException(
- "Currently the writer can only accept BytesRefArrayWritable");
- }
-
- BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
- int size = columns.size();
for (int i = 0; i < size; i++) {
- BytesRefWritable cu = columns.get(i);
- int plainLen = cu.getLength();
- columnBufferSize += plainLen;
- columnValuePlainLength[i] += plainLen;
- columnBuffers[i].append(cu);
+ Datum datum = tuple.get(i);
+ int length = columnBuffers[i].append(schema.getColumn(i), datum);
+ columnBufferSize += length;
+ if (enabledStats) {
+ stats.analyzeField(i, datum);
+ }
}
if (size < columnNumber) {
- for (int i = columns.size(); i < columnNumber; i++) {
- columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
+ for (int i = size; i < columnNumber; i++) {
+ columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
+ if (enabledStats) {
+ stats.analyzeField(i, NullDatum.get());
+ }
}
}
bufferedRecords++;
- if ((columnBufferSize > columnsBufferSize)
- || (bufferedRecords >= RECORD_INTERVAL)) {
- flushRecords();
+ if (this.isCompressed()) {
+ //TODO compression rate base flush
+ if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+ || (bufferedRecords >= RECORD_INTERVAL)) {
+ flushRecords();
+ }
+ } else {
+ //TODO block base flush
+ if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+ || (bufferedRecords >= RECORD_INTERVAL)) {
+ flushRecords();
+ }
}
}
- private void flushRecords() throws IOException {
+ /**
+ * get number of bytes to store the keyBuffer.
+ *
+ * @return number of bytes used to store this KeyBuffer on disk
+ * @throws IOException
+ */
+ public int getKeyBufferSize() throws IOException {
+ int ret = 0;
+ ret += WritableUtils.getVIntSize(bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+ ret += currentBuf.columnKeyLength;
+ }
- key.numberRows = bufferedRecords;
+ return ret;
+ }
+
+ /**
+ * get number of bytes to store the key part.
+ *
+ * @return number of bytes used to store this Key part on disk
+ * @throws IOException
+ */
+ public int getKeyPartSize() throws IOException {
+ int ret = 12; //12 bytes |record count, key length, compressed key length|
+
+ ret += WritableUtils.getVIntSize(bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+ ret += currentBuf.columnKeyLength;
+ ret += currentBuf.UnFlushedGroupSize();
+ }
+
+ return ret;
+ }
+
+ private void WriteKeyBuffer(DataOutputStream out) throws IOException {
+ WritableUtils.writeVLong(out, bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ WritableUtils.writeVLong(out, currentBuf.columnValueLength);
+ WritableUtils.writeVLong(out, currentBuf.uncompressedColumnValueLength);
+ WritableUtils.writeVLong(out, currentBuf.columnKeyLength);
+ currentBuf.valLenBuffer.writeTo(out);
+ }
+ }
+
+ private void flushRecords() throws IOException {
Compressor compressor = null;
- NonSyncDataOutputBuffer valueBuffer = null;
+ NonSyncByteArrayOutputStream valueBuffer = null;
CompressionOutputStream deflateFilter = null;
DataOutputStream deflateOut = null;
boolean isCompressed = isCompressed();
+
int valueLength = 0;
if (isCompressed) {
- ReflectionUtils.setConf(codec, this.conf);
compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
- valueBuffer = new NonSyncDataOutputBuffer();
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ valueBuffer = new NonSyncByteArrayOutputStream();
deflateFilter = codec.createOutputStream(valueBuffer, compressor);
- deflateOut = new DataOutputStream(deflateFilter);
+ deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
}
for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
ColumnBuffer currentBuf = columnBuffers[columnIndex];
currentBuf.flushGroup();
- NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+ NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
int colLen;
- int plainLen = columnValuePlainLength[columnIndex];
-
+ int plainLen = columnValue.getLength();
if (isCompressed) {
- if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
- ((SchemaAwareCompressionOutputStream)deflateFilter).
- setColumnIndex(columnIndex);
- }
deflateFilter.resetState();
deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
deflateOut.flush();
deflateFilter.finish();
+ columnValue.close();
// find how much compressed data was added for this column
colLen = valueBuffer.getLength() - valueLength;
+ currentBuf.columnValueLength = colLen;
} else {
- colLen = columnValuePlainLength[columnIndex];
+ colLen = plainLen;
}
valueLength += colLen;
- key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
- columnIndex);
- plainTotalColumnLength[columnIndex] += plainLen;
- comprTotalColumnLength[columnIndex] += colLen;
- columnValuePlainLength[columnIndex] = 0;
}
- int keyLength = key.getSize();
+ int keyLength = getKeyBufferSize();
if (keyLength < 0) {
- throw new IOException("negative length keys not allowed: " + key);
+ throw new IOException("negative length keys not allowed: " + keyLength);
}
if (compressor != null) {
org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
}
// Write the key out
- writeKey(key, keyLength + valueLength, keyLength);
+ writeKey(keyLength + valueLength, keyLength);
// write the value out
if (isCompressed) {
out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+ valueBuffer.close();
} else {
- for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
- NonSyncDataOutputBuffer buf =
- columnBuffers[columnIndex].columnValBuffer;
- out.write(buf.getData(), 0, buf.getLength());
+ for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
+ columnBuffers[columnIndex].columnValBuffer.writeTo(out);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column#" + columnIndex + " : Plain Total Column Value Length: "
+ + columnBuffers[columnIndex].uncompressedColumnValueLength
+ + ", Compr Total Column Value Length: " + columnBuffers[columnIndex].columnValueLength);
+ }
}
}
-
// clear the columnBuffers
clearColumnBuffers();
@@ -958,41 +1030,34 @@ public class RCFile {
columnBufferSize = 0;
}
- /**
- * flush a block out without doing anything except compressing the key part.
- */
- public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
- int recordLen, int keyLength,
- @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
- writeKey(keyBuffer, recordLen, keyLength);
- valueBuffer.write(out);
- }
-
- private void writeKey(KeyBuffer keyBuffer, int recordLen,
- int keyLength) throws IOException {
+ private void writeKey(int recordLen, int keyLength) throws IOException {
checkAndWriteSync(); // sync
out.writeInt(recordLen); // total record length
out.writeInt(keyLength); // key portion length
- if(this.isCompressed()) {
+ if (this.isCompressed()) {
Compressor compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
- NonSyncDataOutputBuffer compressionBuffer =
- new NonSyncDataOutputBuffer();
- CompressionOutputStream deflateFilter =
- codec.createOutputStream(compressionBuffer, compressor);
- DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
+ CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
+ DataOutputStream deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
//compress key and write key out
compressionBuffer.reset();
deflateFilter.resetState();
- keyBuffer.write(deflateOut);
+ WriteKeyBuffer(deflateOut);
deflateOut.flush();
deflateFilter.finish();
int compressedKeyLen = compressionBuffer.getLength();
out.writeInt(compressedKeyLen);
- out.write(compressionBuffer.getData(), 0, compressedKeyLen);
+ compressionBuffer.writeTo(out);
+ compressionBuffer.reset();
+ deflateOut.close();
+ org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
} else {
out.writeInt(keyLength);
- keyBuffer.write(out);
+ WriteKeyBuffer(out);
}
}
@@ -1002,6 +1067,16 @@ public class RCFile {
}
}
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
public synchronized void close() throws IOException {
if (bufferedRecords > 0) {
flushRecords();
@@ -1009,33 +1084,27 @@ public class RCFile {
clearColumnBuffers();
if (out != null) {
-
// Close the underlying stream if we own it...
out.flush();
out.close();
out = null;
}
- for (int i = 0; i < columnNumber; i++) {
- LOG.info("Column#" + i + " : Plain Total Column Value Length: "
- + plainTotalColumnLength[i]
- + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]);
- }
}
}
/**
* Read KeyBuffer/ValueBuffer pairs from a RCFile.
- *
*/
- public static class Reader {
+ public static class RCFileScanner extends FileScanner {
private static class SelectedColumn {
public int colIndex;
public int rowReadIndex;
public int runLength;
public int prvLength;
+ public boolean isNulled;
}
- private final Path file;
- private final FSDataInputStream in;
+
+ private FSDataInputStream in;
private byte version;
@@ -1048,13 +1117,16 @@ public class RCFile {
private long lastSeenSyncPos = 0;
private long headerEnd;
- private final long end;
+ private long start, end;
+ private long startOffset, endOffset;
+ private int[] targetColumnIndexes;
+
private int currentKeyLength;
private int currentRecordLength;
private final Configuration conf;
- private final ValueBuffer currentValue;
+ private ValueBuffer currentValue;
private int readRowsIndexInBuffer = 0;
@@ -1062,52 +1134,76 @@ public class RCFile {
private int columnNumber = 0;
- private int loadColumnNum;
+ private boolean more = true;
private int passedRowsNum = 0;
- // Should we try to tolerate corruption? Default is No.
- private boolean tolerateCorruptions = false;
-
private boolean decompress = false;
private Decompressor keyDecompressor;
- NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+
//Current state of each selected column - e.g. current run length, etc.
// The size of the array is equal to the number of selected columns
- private final SelectedColumn[] selectedColumns;
-
- // map of original column id -> index among selected columns
- private final int[] revPrjColIDs;
+ private SelectedColumn[] selectedColumns;
// column value lengths for each of the selected columns
- private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+ private NonSyncDataInputBuffer[] colValLenBufferReadIn;
- /** Create a new RCFile reader. */
- public Reader(FileSystem fs, Path file, Configuration conf) throws IOException {
- this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs
- .getFileStatus(file).getLen());
- }
+ private LongWritable rowId;
+ private byte[] nullChars;
+ private SerializeDeserialize serde;
+
+ public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
+ final FileFragment fragment) throws IOException {
+ super(conf, schema, meta, fragment);
+
+ rowId = new LongWritable();
+ conf.setInt("io.file.buffer.size", 4096); //TODO remove
- /** Create a new RCFile reader. */
- public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
- long start, long length) throws IOException {
- tolerateCorruptions = conf.getBoolean(
- TOLERATE_CORRUPTIONS_CONF_STR, false);
- conf.setInt("io.file.buffer.size", bufferSize);
- this.file = file;
- in = openFile(fs, file, bufferSize, length);
+
+ startOffset = fragment.getStartKey();
+ endOffset = startOffset + fragment.getEndKey();
this.conf = conf;
- end = start + length;
+ more = startOffset < endOffset;
+ start = 0;
+ }
+
+ @Override
+ public void init() throws IOException {
+ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ // projection
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+ }
+ Arrays.sort(targetColumnIndexes);
+
+ FileSystem fs = fragment.getPath().getFileSystem(conf);
+ end = fs.getFileStatus(fragment.getPath()).getLen();
+ in = openFile(fs, fragment.getPath(), 4096);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RCFile open:" + fragment.getPath() + "," + start + "," + (endOffset - startOffset) +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+ //init RCFILE Header
boolean succeed = false;
try {
if (start > 0) {
seek(0);
- init();
- seek(start);
+ initHeader();
} else {
- init();
+ initHeader();
}
succeed = true;
} finally {
@@ -1115,7 +1211,7 @@ public class RCFile {
if (in != null) {
try {
in.close();
- } catch(IOException e) {
+ } catch (IOException e) {
if (LOG != null && LOG.isDebugEnabled()) {
LOG.debug("Exception in closing " + in, e);
}
@@ -1124,64 +1220,34 @@ public class RCFile {
}
}
- columnNumber = Integer.parseInt(metadata.get(
- new Text(COLUMN_NUMBER_METADATA_STR)).toString());
-
- java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
- .getReadColumnIDs(conf);
+ columnNumber = Integer.parseInt(metadata.get(new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+ selectedColumns = new SelectedColumn[targetColumnIndexes.length];
+ colValLenBufferReadIn = new NonSyncDataInputBuffer[targetColumnIndexes.length];
boolean[] skippedColIDs = new boolean[columnNumber];
- if (notSkipIDs.size() > 0) {
- for (int i = 0; i < skippedColIDs.length; i++) {
- skippedColIDs[i] = true;
- }
- for (int read : notSkipIDs) {
- if (read < columnNumber) {
- skippedColIDs[read] = false;
- }
- }
- } else {
- // TODO: if no column name is specified e.g, in select count(1) from tt;
- // skip all columns, this should be distinguished from the case:
- // select * from tt;
- for (int i = 0; i < skippedColIDs.length; i++) {
- skippedColIDs[i] = false;
- }
- }
-
- loadColumnNum = columnNumber;
- if (skippedColIDs.length > 0) {
- for (boolean skippedColID : skippedColIDs) {
- if (skippedColID) {
- loadColumnNum -= 1;
- }
- }
- }
+ Arrays.fill(skippedColIDs, true);
+ super.init();
+ for (int i = 0; i < targetColumnIndexes.length; i++) {
+ int tid = targetColumnIndexes[i];
+ if (tid < columnNumber) {
+ skippedColIDs[tid] = false;
- revPrjColIDs = new int[columnNumber];
- // get list of selected column IDs
- selectedColumns = new SelectedColumn[loadColumnNum];
- colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
- for (int i = 0, j = 0; i < columnNumber; ++i) {
- if (!skippedColIDs[i]) {
SelectedColumn col = new SelectedColumn();
- col.colIndex = i;
+ col.colIndex = tid;
col.runLength = 0;
col.prvLength = -1;
col.rowReadIndex = 0;
- selectedColumns[j] = col;
- colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
- revPrjColIDs[i] = j;
- j++;
- } else {
- revPrjColIDs[i] = -1;
+ selectedColumns[i] = col;
+ colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
}
}
currentKey = createKeyBuffer();
- boolean lazyDecompress = !tolerateCorruptions;
- currentValue = new ValueBuffer(
- null, columnNumber, skippedColIDs, codec, lazyDecompress);
+ currentValue = new ValueBuffer(null, columnNumber, targetColumnIndexes, codec, skippedColIDs);
+
+ if (startOffset > getPosition()) { // TODO use sync cache
+ sync(startOffset); // sync to start
+ }
}
/**
@@ -1194,6 +1260,7 @@ public class RCFile {
/**
* Return the metadata value associated with the given key.
+ *
* @param key the metadata key to retrieve
*/
public Text getMetadataValueOf(Text key) {
@@ -1204,25 +1271,24 @@ public class RCFile {
* Override this method to specialize the type of
* {@link FSDataInputStream} returned.
*/
- protected FSDataInputStream openFile(FileSystem fs, Path file,
- int bufferSize, long length) throws IOException {
+ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize) throws IOException {
return fs.open(file, bufferSize);
}
- private void init() throws IOException {
+ private void initHeader() throws IOException {
byte[] magic = new byte[MAGIC.length];
in.readFully(magic);
if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
byte vers = in.readByte();
if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
- throw new IOException(file + " is a version " + vers +
+ throw new IOException(fragment.getPath() + " is a version " + vers +
" SequenceFile instead of an RCFile.");
}
version = ORIGINAL_VERSION;
} else {
if (!Arrays.equals(magic, MAGIC)) {
- throw new IOException(file + " not a RCFile and has magic of " +
+ throw new IOException(fragment.getPath() + " not a RCFile and has magic of " +
new String(magic));
}
@@ -1239,10 +1305,10 @@ public class RCFile {
Class<?> valCls = conf.getClassByName(Text.readString(in));
if (!keyCls.equals(KeyBuffer.class)
|| !valCls.equals(ValueBuffer.class)) {
- throw new IOException(file + " not a RCFile");
+ throw new IOException(fragment.getPath() + " not a RCFile");
}
} catch (ClassNotFoundException e) {
- throw new IOException(file + " not a RCFile", e);
+ throw new IOException(fragment.getPath() + " not a RCFile", e);
}
}
@@ -1252,7 +1318,7 @@ public class RCFile {
// is block-compressed? it should be always false.
boolean blkCompressed = in.readBoolean();
if (blkCompressed) {
- throw new IOException(file + " not a RCFile.");
+ throw new IOException(fragment.getPath() + " not a RCFile.");
}
}
@@ -1267,30 +1333,48 @@ public class RCFile {
throw new IllegalArgumentException(
"Unknown codec: " + codecClassname, cnfe);
}
+
keyDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
}
metadata = new Metadata();
metadata.readFields(in);
+ Text text = metadata.get(new Text(SERDE));
+
+ try {
+ String serdeClass;
+ if(text != null && !text.toString().isEmpty()){
+ serdeClass = text.toString();
+ } else{
+ serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+ }
+ serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
in.readFully(sync); // read sync bytes
headerEnd = in.getPos();
}
- /** Return the current byte position in the input file. */
+ /**
+ * Return the current byte position in the input file.
+ */
public synchronized long getPosition() throws IOException {
return in.getPos();
}
/**
* Set the current byte position in the input file.
- *
- * <p>
+ * <p/>
+ * <p/>
* The position passed must be a position returned by
- * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
- * arbitrary position, use {@link RCFile.Reader#sync(long)}. In another
+ * {@link RCFile.RCFileAppender#getLength()} when writing this file. To seek to an
+ * arbitrary position, use {@link RCFile.RCFileScanner#sync(long)}. In another
* words, the current seek can only seek to the end of the file. For other
- * positions, use {@link RCFile.Reader#sync(long)}.
+ * positions, use {@link RCFile.RCFileScanner#sync(long)}.
*/
public synchronized void seek(long position) throws IOException {
in.seek(position);
@@ -1298,7 +1382,7 @@ public class RCFile {
/**
* Resets the values which determine if there are more rows in the buffer
- *
+ * <p/>
* This can be used after one calls seek or sync, if one called next before that.
* Otherwise, the seek or sync will have no effect, it will continue to get rows from the
* buffer built up from the call to next.
@@ -1308,7 +1392,9 @@ public class RCFile {
recordsNumInValBuffer = 0;
}
- /** Seek to the next sync mark past a given position. */
+ /**
+ * Seek to the next sync mark past a given position.
+ */
public synchronized void sync(long position) throws IOException {
if (position + SYNC_SIZE >= end) {
seek(end);
@@ -1326,21 +1412,31 @@ public class RCFile {
try {
seek(position + 4); // skip escape
- in.readFully(syncCheck);
- int syncLen = sync.length;
- for (int i = 0; in.getPos() < end; i++) {
- int j = 0;
- for (; j < syncLen; j++) {
- if (sync[j] != syncCheck[(i + j) % syncLen]) {
- break;
+
+ int prefix = sync.length;
+ int n = conf.getInt("io.bytes.per.checksum", 512);
+ byte[] buffer = new byte[prefix + n];
+ n = (int) Math.min(n, end - in.getPos());
+ /* fill array with a pattern that will never match sync */
+ Arrays.fill(buffer, (byte) (~sync[0]));
+ while (n > 0 && (in.getPos() + n) <= end) {
+ position = in.getPos();
+ in.readFully(buffer, prefix, n);
+ /* the buffer has n+sync bytes */
+ for (int i = 0; i < n; i++) {
+ int j;
+ for (j = 0; j < sync.length && sync[j] == buffer[i + j]; j++) {
+ /* nothing */
+ }
+ if (j == sync.length) {
+ /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+ in.seek(position + i - SYNC_SIZE);
+ return;
}
}
- if (j == syncLen) {
- in.seek(in.getPos() - SYNC_SIZE); // position before
- // sync
- return;
- }
- syncCheck[i % syncLen] = in.readByte();
+ /* move the last 16 bytes to the prefix area */
+ System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
+ n = (int) Math.min(n, end - in.getPos());
}
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
@@ -1403,7 +1499,6 @@ public class RCFile {
private int compressedKeyLen = 0;
NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
- NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
KeyBuffer currentKey = null;
boolean keyInit = false;
@@ -1418,17 +1513,31 @@ public class RCFile {
currentKeyLength = in.readInt();
compressedKeyLen = in.readInt();
if (decompress) {
- keyTempBuffer.reset();
- keyTempBuffer.write(in, compressedKeyLen);
- keyDecompressBuffer.reset(keyTempBuffer.getData(), compressedKeyLen);
- CompressionInputStream deflatFilter = codec.createInputStream(
- keyDecompressBuffer, keyDecompressor);
- DataInputStream compressedIn = new DataInputStream(deflatFilter);
- deflatFilter.resetState();
- keyDecompressedData.reset();
- keyDecompressedData.write(compressedIn, currentKeyLength);
- keyDataIn.reset(keyDecompressedData.getData(), currentKeyLength);
+
+ byte[] compressedBytes = new byte[compressedKeyLen];
+ in.readFully(compressedBytes, 0, compressedKeyLen);
+
+ if (keyDecompressor != null) keyDecompressor.reset();
+ keyDecompressBuffer.reset(compressedBytes, compressedKeyLen);
+
+ DataInputStream is;
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+ keyDecompressBuffer, keyDecompressor, 0, compressedKeyLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ keyDecompressBuffer.seek(deflatFilter.getAdjustedStart());
+ is = new DataInputStream(deflatFilter);
+ } else {
+ CompressionInputStream deflatFilter = codec.createInputStream(keyDecompressBuffer, keyDecompressor);
+ is = new DataInputStream(deflatFilter);
+ }
+
+ byte[] deCompressedBytes = new byte[currentKeyLength];
+
+ is.readFully(deCompressedBytes, 0, currentKeyLength);
+ keyDataIn.reset(deCompressedBytes, currentKeyLength);
currentKey.readFields(keyDataIn);
+ is.close();
} else {
currentKey.readFields(in);
}
@@ -1441,12 +1550,20 @@ public class RCFile {
for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
SelectedColumn col = selectedColumns[selIx];
+ if (col == null) {
+ col = new SelectedColumn();
+ col.isNulled = true;
+ selectedColumns[selIx] = col;
+ continue;
+ }
+
int colIx = col.colIndex;
- NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+ NonSyncByteArrayOutputStream buf = currentKey.allCellValLenBuffer[colIx];
colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
col.rowReadIndex = 0;
col.runLength = 0;
col.prvLength = -1;
+ col.isNulled = buf.getLength() == 0;
}
return currentKeyLength;
@@ -1462,106 +1579,41 @@ public class RCFile {
currentValue.inited = true;
}
- public boolean nextBlock() throws IOException {
- int keyLength = nextKeyBuffer();
- if(keyLength > 0) {
- currentValueBuffer();
- return true;
- }
- return false;
- }
-
private boolean rowFetched = false;
- // use this buffer to hold column's cells value length for usages in
- // getColumn(), instead of using colValLenBufferReadIn directly.
- private final NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
-
- /**
- * Fetch all data in the buffer for a given column. This is useful for
- * columnar operators, which perform operations on an array data of one
- * column. It should be used together with {@link #nextColumnsBatch()}.
- * Calling getColumn() with not change the result of
- * {@link #next(LongWritable)} and
- * {@link #getCurrentRow(BytesRefArrayWritable)}.
- *
- * @param columnID the number of the column to get 0 to N-1
- * @throws IOException
- */
- public BytesRefArrayWritable getColumn(int columnID,
- BytesRefArrayWritable rest) throws IOException {
- int selColIdx = revPrjColIDs[columnID];
- if (selColIdx == -1) {
+ @Override
+ public Tuple next() throws IOException {
+ if (!more) {
return null;
}
- if (rest == null) {
- rest = new BytesRefArrayWritable();
- }
-
- rest.resetValid(recordsNumInValBuffer);
-
- if (!currentValue.inited) {
- currentValueBuffer();
- }
-
- int columnNextRowStart = 0;
- fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
- .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
- SelectedColumn selCol = selectedColumns[selColIdx];
- byte[] uncompData = null;
- ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
- boolean decompressed = currentValue.decompressedFlag[selColIdx];
- if (decompressed) {
- uncompData =
- currentValue.loadedColumnsValueBuffer[selColIdx].getData();
- } else {
- decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+ more = nextBuffer(rowId);
+ long lastSeenSyncPos = lastSeenSyncPos();
+ if (lastSeenSyncPos >= endOffset) {
+ more = false;
+ return null;
}
- for (int i = 0; i < recordsNumInValBuffer; i++) {
- colAdvanceRow(selColIdx, selCol);
- int length = selCol.prvLength;
-
- BytesRefWritable currentCell = rest.get(i);
- if (decompressed) {
- currentCell.set(uncompData, columnNextRowStart, length);
- } else {
- currentCell.set(decompCallBack, columnNextRowStart, length);
- }
- columnNextRowStart = columnNextRowStart + length;
+ if (!more) {
+ return null;
}
- return rest;
- }
- /**
- * Read in next key buffer and throw any data in current key buffer and
- * current value buffer. It will influence the result of
- * {@link #next(LongWritable)} and
- * {@link #getCurrentRow(BytesRefArrayWritable)}
- *
- * @return whether there still has records or not
- * @throws IOException
- */
- @SuppressWarnings("unused")
- @Deprecated
- public synchronized boolean nextColumnsBatch() throws IOException {
- passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
- return nextKeyBuffer() > 0;
+ Tuple tuple = new VTuple(schema.getColumnNum());
+ getCurrentRow(tuple);
+ return tuple;
}
/**
- * Returns how many rows we fetched with next(). It only means how many rows
- * are read by next(). The returned result may be smaller than actual number
- * of rows passed by, because {@link #seek(long)},
- * {@link #nextColumnsBatch()} can change the underlying key buffer and
+ * Returns how many rows we fetched with nextBuffer(). It only means how many rows
+ * are read by nextBuffer(). The returned result may be smaller than actual number
+ * of rows passed by, because {@link #seek(long)} can change the underlying key buffer and
* value buffer.
*
* @return next row number
* @throws IOException
*/
- public synchronized boolean next(LongWritable readRows) throws IOException {
- if (hasRecordsInBuffer()) {
+ public synchronized boolean nextBuffer(LongWritable readRows) throws IOException {
+ if (readRowsIndexInBuffer < recordsNumInValBuffer) {
readRows.set(passedRowsNum);
readRowsIndexInBuffer++;
passedRowsNum++;
@@ -1572,109 +1624,41 @@ public class RCFile {
}
int ret = -1;
- if (tolerateCorruptions) {
- ret = nextKeyValueTolerateCorruptions();
- } else {
- try {
- ret = nextKeyBuffer();
- } catch (EOFException eof) {
- eof.printStackTrace();
- }
- }
- return (ret > 0) && next(readRows);
- }
-
- private int nextKeyValueTolerateCorruptions() throws IOException {
- long currentOffset = in.getPos();
- int ret;
try {
ret = nextKeyBuffer();
- this.currentValueBuffer();
- } catch (IOException ioe) {
- // A BlockMissingException indicates a temporary error,
- // not a corruption. Re-throw this exception.
- String msg = ioe.getMessage();
- if (msg != null && msg.startsWith(BLOCK_MISSING_MESSAGE)) {
- LOG.warn("Re-throwing block-missing exception" + ioe);
- throw ioe;
- }
- // We have an IOException other than a BlockMissingException.
- LOG.warn("Ignoring IOException in file " + file +
- " after offset " + currentOffset, ioe);
- ret = -1;
- } catch (Throwable t) {
- // We got an exception that is not IOException
- // (typically OOM, IndexOutOfBounds, InternalError).
- // This is most likely a corruption.
- LOG.warn("Ignoring unknown error in " + file +
- " after offset " + currentOffset, t);
- ret = -1;
+ } catch (EOFException eof) {
+ eof.printStackTrace();
}
- return ret;
- }
-
- public boolean hasRecordsInBuffer() {
- return readRowsIndexInBuffer < recordsNumInValBuffer;
+ return (ret > 0) && nextBuffer(readRows);
}
/**
- * get the current row used,make sure called {@link #next(LongWritable)}
+ * get the current row used,make sure called {@link #next()}
* first.
*
* @throws IOException
*/
- public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
-
+ public void getCurrentRow(Tuple tuple) throws IOException {
if (!keyInit || rowFetched) {
return;
}
- if (tolerateCorruptions) {
- if (!currentValue.inited) {
- currentValueBuffer();
- }
- ret.resetValid(columnNumber);
- } else {
- if (!currentValue.inited) {
- currentValueBuffer();
- // do this only when not initialized, but we may need to find a way to
- // tell the caller how to initialize the valid size
- ret.resetValid(columnNumber);
- }
+ if (!currentValue.inited) {
+ currentValueBuffer();
}
- // we do not use BytesWritable here to avoid the byte-copy from
- // DataOutputStream to BytesWritable
- if (currentValue.numCompressed > 0) {
- for (int j = 0; j < selectedColumns.length; ++j) {
- SelectedColumn col = selectedColumns[j];
- int i = col.colIndex;
-
- BytesRefWritable ref = ret.unCheckedGet(i);
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+ if (col.isNulled) {
+ tuple.put(i, NullDatum.get());
+ } else {
colAdvanceRow(j, col);
- if (currentValue.decompressedFlag[j]) {
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
- } else {
- ref.set(currentValue.lazyDecompressCallbackObjs[j],
- col.rowReadIndex, col.prvLength);
- }
- col.rowReadIndex += col.prvLength;
- }
- } else {
- // This version of the loop eliminates a condition check and branch
- // and is measurably faster (20% or so)
- for (int j = 0; j < selectedColumns.length; ++j) {
- SelectedColumn col = selectedColumns[j];
- int i = col.colIndex;
-
- BytesRefWritable ref = ret.unCheckedGet(i);
-
- colAdvanceRow(j, col);
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
+ Datum datum = serde.deserialize(schema.getColumn(i),
+ currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
+ tuple.put(i, datum);
col.rowReadIndex += col.prvLength;
}
}
@@ -1683,9 +1667,10 @@ public class RCFile {
/**
* Advance column state to the next now: update offsets, run lengths etc
+ *
* @param selCol - index among selectedColumns
- * @param col - column object to update the state of. prvLength will be
- * set to the new read position
+ * @param col - column object to update the state of. prvLength will be
+ * set to the new read position
* @throws IOException
*/
private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
@@ -1704,75 +1689,57 @@ public class RCFile {
}
}
- /** Returns true iff the previous call to next passed a sync mark. */
- @SuppressWarnings("unused")
+ /**
+ * Returns true if the previous call to next passed a sync mark.
+ */
public boolean syncSeen() {
return syncSeen;
}
- /** Returns the last seen sync position. */
+ /**
+ * Returns the last seen sync position.
+ */
public long lastSeenSyncPos() {
return lastSeenSyncPos;
}
- /** Returns the name of the file. */
- @Override
- public String toString() {
- return file.toString();
- }
-
- @SuppressWarnings("unused")
- public boolean isCompressedRCFile() {
- return this.decompress;
- }
-
- /** Close the reader. */
- public void close() {
- IOUtils.closeStream(in);
- currentValue.close();
- if (decompress) {
- IOUtils.closeStream(keyDecompressedData);
- org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
- }
- }
-
/**
- * return the KeyBuffer object used in the reader. Internally in each
- * reader, there is only one KeyBuffer object, which gets reused for every
- * block.
+ * Returns the name of the file.
*/
- public KeyBuffer getCurrentKeyBufferObj() {
- return this.currentKey;
+ @Override
+ public String toString() {
+ return fragment.getPath().toString();
}
- /**
- * return the ValueBuffer object used in the reader. Internally in each
- * reader, there is only one ValueBuffer object, which gets reused for every
- * block.
- */
- public ValueBuffer getCurrentValueBufferObj() {
- return this.currentValue;
+ @Override
+ public void reset() throws IOException {
+ seek(startOffset);
}
- //return the current block's length
- public int getCurrentBlockLength() {
- return this.currentRecordLength;
+ @Override
+ public boolean isProjectable() {
+ return true;
}
- //return the current block's key length
- public int getCurrentKeyLength() {
- return this.currentKeyLength;
+ @Override
+ public boolean isSelectable() {
+ return false;
}
- //return the current block's compressed key length
- public int getCurrentCompressedKeyLen() {
- return this.compressedKeyLen;
+ @Override
+ public boolean isSplittable() {
+ return true;
}
- //return the CompressionCodec used for this file
- public CompressionCodec getCompressionCodec() {
- return this.codec;
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeStream(in);
+ currentValue.close();
+ if (keyDecompressor != null) {
+ // Make sure we only return decompressor once.
+ org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
+ keyDecompressor = null;
+ }
}
-
}
}