You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:39 UTC
[05/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
new file mode 100644
index 0000000..d88223b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -0,0 +1,1807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.Closeable;
+import java.io.*;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+/**
+ * <code>RCFile</code>s, short of Record Columnar File, are flat files
+ * consisting of binary key/value pairs, which shares much similarity with
+ * <code>SequenceFile</code>.
+ * <p/>
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part. When writing, RCFile.Writer first holds records'
+ * value bytes in memory, and determines a row split if the raw bytes size of
+ * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
+ * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+ * 4 * 1024 * 1024)</code> .
+ * <p>
+ * <code>RCFile</code> provides {@link java.io.Writer}, {@link java.io.Reader} and classes for
+ * writing, reading respectively.
+ * </p>
+ * <p/>
+ * <p>
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part.
+ * </p>
+ * <p/>
+ * <p>
+ * RCFile compresses values in a more fine-grained manner then record level
+ * compression. However, It currently does not support compress the key part
+ * yet. The actual compression algorithm used to compress key and/or values can
+ * be specified by using the appropriate {@link org.apache.hadoop.io.compress.CompressionCodec}.
+ * </p>
+ * <p/>
+ * <p>
+ * The {@link java.io.Reader} is used to read and explain the bytes of RCFile.
+ * </p>
+ * <p/>
+ * <h4 id="Formats">RCFile Formats</h4>
+ * <p/>
+ * <p/>
+ * <h5 id="Header">RC Header</h5>
+ * <ul>
+ * <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
+ * actual version number (e.g. RCF1)</li>
+ * <li>compression - A boolean which specifies if compression is turned on for
+ * keys/values in this file.</li>
+ * <li>compression codec - <code>CompressionCodec</code> class which is used
+ * for compression of keys and/or values (if compression is enabled).</li>
+ * <li>metadata - {@link org.apache.hadoop.io.SequenceFile.Metadata} for this file.</li>
+ * <li>sync - A sync marker to denote end of the header.</li>
+ * </ul>
+ * <p/>
+ * <h5>RCFile Format</h5>
+ * <ul>
+ * <li><a href="#Header">Header</a></li>
+ * <li>Record
+ * <li>Key part
+ * <ul>
+ * <li>Record length in bytes</li>
+ * <li>Key length in bytes</li>
+ * <li>Number_of_rows_in_this_record(vint)</li>
+ * <li>Column_1_ondisk_length(vint)</li>
+ * <li>Column_1_row_1_value_plain_length</li>
+ * <li>Column_1_row_2_value_plain_length</li>
+ * <li>...</li>
+ * <li>Column_2_ondisk_length(vint)</li>
+ * <li>Column_2_row_1_value_plain_length</li>
+ * <li>Column_2_row_2_value_plain_length</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * </li>
+ * <li>Value part
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
+ *
+ * rcfile ::=
+ * <file-header>
+ * <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ * <file-version-header>
+ * <file-key-class-name> (only exists if version is seq6)
+ * <file-value-class-name> (only exists if version is seq6)
+ * <file-is-compressed>
+ * <file-is-block-compressed> (only exists if version is seq6)
+ * [<file-compression-codec-class>]
+ * <file-header-metadata>
+ * <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header. A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ * | Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ * Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ * <rowgroup-header>
+ * <rowgroup-key-data>
+ * <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ * [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ * <rowgroup-record-length>
+ * <rowgroup-key-length>
+ * <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ * <rowgroup-num-rows>
+ * <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ * <column-buffer-length>
+ * <column-buffer-uncompressed-length>
+ * <column-key-buffer-length>
+ * <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
+ */
+public class RCFile {
+
+ private static final Log LOG = LogFactory.getLog(RCFile.class);
+
+ public static final String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
+ public static final String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
+
+ // All of the versions should be place in this list.
+ private static final int ORIGINAL_VERSION = 0; // version with SEQ
+ private static final int NEW_MAGIC_VERSION = 1; // version with RCF
+
+ private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
+
+ // The first version of RCFile used the sequence file header.
+ private static final byte[] ORIGINAL_MAGIC = new byte[]{
+ (byte) 'S', (byte) 'E', (byte) 'Q'};
+ // the version that was included with the original magic, which is mapped
+ // into ORIGINAL_VERSION
+ private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
+
+ private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[]{
+ (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
+ };
+
+ // The 'magic' bytes at the beginning of the RCFile
+ private static final byte[] MAGIC = new byte[]{
+ (byte) 'R', (byte) 'C', (byte) 'F'};
+
+ private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+ private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+ private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+
+ /**
+ * The number of bytes between sync points.
+ */
+ public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+ public static final String NULL = "rcfile.null";
+ public static final String SERDE = "rcfile.serde";
+
+ /**
+ * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
+ * below:
+ * <p/>
+ * <ul>
+ * <li>record length in bytes,it is the sum of bytes used to store the key
+ * part and the value part.</li>
+ * <li>Key length in bytes, it is how many bytes used by the key part.</li>
+ * <li>number_of_rows_in_this_record(vint),</li>
+ * <li>column_1_ondisk_length(vint),</li>
+ * <li>column_1_row_1_value_plain_length,</li>
+ * <li>column_1_row_2_value_plain_length,</li>
+ * <li>....</li>
+ * <li>column_2_ondisk_length(vint),</li>
+ * <li>column_2_row_1_value_plain_length,</li>
+ * <li>column_2_row_2_value_plain_length,</li>
+ * <li>.... .</li>
+ * <li>{the end of the key part}</li>
+ * </ul>
+ */
+ public static class KeyBuffer {
+ // each column's length in the value
+ private int[] eachColumnValueLen = null;
+ private int[] eachColumnUncompressedValueLen = null;
+ // stores each cell's length of a column in one DataOutputBuffer element
+ private NonSyncByteArrayOutputStream[] allCellValLenBuffer = null;
+ // how many rows in this split
+ private int numberRows = 0;
+ // how many columns
+ private int columnNumber = 0;
+
+ KeyBuffer(int columnNum) {
+ columnNumber = columnNum;
+ eachColumnValueLen = new int[columnNumber];
+ eachColumnUncompressedValueLen = new int[columnNumber];
+ allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ eachColumnValueLen = new int[columnNumber];
+ eachColumnUncompressedValueLen = new int[columnNumber];
+ allCellValLenBuffer = new NonSyncByteArrayOutputStream[columnNumber];
+
+ numberRows = WritableUtils.readVInt(in);
+ for (int i = 0; i < columnNumber; i++) {
+ eachColumnValueLen[i] = WritableUtils.readVInt(in);
+ eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
+ int bufLen = WritableUtils.readVInt(in);
+ if (allCellValLenBuffer[i] == null) {
+ allCellValLenBuffer[i] = new NonSyncByteArrayOutputStream();
+ } else {
+ allCellValLenBuffer[i].reset();
+ }
+ allCellValLenBuffer[i].write(in, bufLen);
+ }
+ }
+
+ /**
+ * @return the numberRows
+ */
+ public int getNumberRows() {
+ return numberRows;
+ }
+ }
+
+ /**
+ * ValueBuffer is the value of each record in RCFile. Its on-disk layout is as
+ * below:
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ */
+ public static class ValueBuffer implements Closeable{
+
+ // used to load columns' value into memory
+ private NonSyncByteArrayOutputStream[] loadedColumnsValueBuffer = null;
+
+ boolean inited = false;
+
+ // used for readFields
+ KeyBuffer keyBuffer;
+ private int columnNumber = 0;
+
+ // set true for columns that needed to skip loading into memory.
+ boolean[] skippedColIDs = null;
+
+ CompressionCodec codec;
+ Decompressor decompressor = null;
+ NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
+ private long readBytes = 0;
+
+
+ public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+ int[] targets, CompressionCodec codec, boolean[] skippedIDs)
+ throws IOException {
+ keyBuffer = currentKey;
+ this.columnNumber = columnNumber;
+ this.skippedColIDs = skippedIDs;
+ this.codec = codec;
+ loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
+ if (codec != null) {
+ decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
+ }
+
+ for (int i = 0; i < targets.length; i++) {
+ loadedColumnsValueBuffer[i] = new NonSyncByteArrayOutputStream();
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int addIndex = 0;
+ int skipTotal = 0;
+
+
+ for (int i = 0; i < columnNumber; i++) {
+ int vaRowsLen = keyBuffer.eachColumnValueLen[i];
+ // skip this column
+ if (skippedColIDs[i]) {
+ skipTotal += vaRowsLen;
+ continue;
+ }
+
+ if (skipTotal != 0) {
+ StorageUtil.skipFully(in, skipTotal);
+ skipTotal = 0;
+ }
+
+ NonSyncByteArrayOutputStream valBuf;
+ if (codec != null) {
+ // load into compressed buf first
+
+ byte[] compressedBytes = new byte[vaRowsLen];
+ in.readFully(compressedBytes, 0, vaRowsLen);
+
+ decompressBuffer.reset(compressedBytes, vaRowsLen);
+ if(decompressor != null) decompressor.reset();
+
+ DataInputStream is;
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+ decompressBuffer, decompressor, 0, vaRowsLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ is = new DataInputStream(deflatFilter);
+ } else {
+ CompressionInputStream deflatFilter = codec.createInputStream(decompressBuffer, decompressor);
+ is = new DataInputStream(deflatFilter);
+ }
+
+ valBuf = loadedColumnsValueBuffer[addIndex];
+ valBuf.reset();
+ valBuf.write(is, keyBuffer.eachColumnUncompressedValueLen[i]);
+ is.close();
+ decompressBuffer.close();
+ } else {
+ valBuf = loadedColumnsValueBuffer[addIndex];
+ valBuf.reset();
+ valBuf.write(in, vaRowsLen);
+ }
+ readBytes += keyBuffer.eachColumnUncompressedValueLen[i];
+ addIndex++;
+ }
+
+ if (skipTotal != 0) {
+ StorageUtil.skipFully(in, skipTotal);
+ }
+ }
+
+ public long getReadBytes() {
+ return readBytes;
+ }
+
+ public void clearColumnBuffer() throws IOException {
+ decompressBuffer.reset();
+ readBytes = 0;
+ }
+
+ @Override
+ public void close() {
+ for (NonSyncByteArrayOutputStream element : loadedColumnsValueBuffer) {
+ IOUtils.closeStream(element);
+ }
+ if (codec != null) {
+ IOUtils.closeStream(decompressBuffer);
+ if (decompressor != null) {
+ // Make sure we only return decompressor once.
+ org.apache.tajo.storage.compress.CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a metadata object with alternating key-value pairs.
+ * Eg. metadata(key1, value1, key2, value2)
+ */
+ public static Metadata createMetadata(Text... values) {
+ if (values.length % 2 != 0) {
+ throw new IllegalArgumentException("Must have a matched set of " +
+ "key-value pairs. " + values.length +
+ " strings supplied.");
+ }
+ Metadata result = new Metadata();
+ for (int i = 0; i < values.length; i += 2) {
+ result.set(values[i], values[i + 1]);
+ }
+ return result;
+ }
+
+ /**
+ * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
+ * compatible with SequenceFile's.
+ */
+ public static class RCFileAppender extends FileAppender {
+ FSDataOutputStream out;
+
+ CompressionCodec codec = null;
+ Metadata metadata = null;
+ FileSystem fs = null;
+ TableStatistics stats = null;
+ int columnNumber = 0;
+
+ // how many records the writer buffers before it writes to disk
+ private int RECORD_INTERVAL = Integer.MAX_VALUE;
+ // the max size of memory for buffering records before writes them out
+ private int COLUMNS_BUFFER_SIZE = 16 * 1024 * 1024; // 16M
+ // the conf string for COLUMNS_BUFFER_SIZE
+ public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+
+ // how many records already buffered
+ private int bufferedRecords = 0;
+ private ColumnBuffer[] columnBuffers = null;
+ boolean useNewMagic = true;
+ private byte[] nullChars;
+ private SerializerDeserializer serde;
+ private boolean isShuffle;
+
+ // Insert a globally unique 16-byte value every few entries, so that one
+ // can seek into the middle of a file and then synchronize with record
+ // starts and ends by scanning for this value.
+ long lastSyncPos; // position of last sync
+ byte[] sync; // 16 random bytes
+
+ {
+ try {
+ MessageDigest digester = MessageDigest.getInstance("MD5");
+ long time = System.currentTimeMillis();
+ digester.update((new UID() + "@" + time).getBytes());
+ sync = digester.digest();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * used for buffering appends before flush them out
+ */
+ class ColumnBuffer {
+ // used for buffer a column's values
+ NonSyncByteArrayOutputStream columnValBuffer;
+ // used to store each value's length
+ NonSyncByteArrayOutputStream valLenBuffer;
+
+ /*
+ * use a run-length encoding. We only record run length if a same
+ * 'prevValueLen' occurs more than one time. And we negative the run
+ * length to distinguish a runLength and a normal value length. For
+ * example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And for
+ * value lengths 1,2,3 we record 1,2,3.
+ */
+ int columnValueLength = 0;
+ int uncompressedColumnValueLength = 0;
+ int columnKeyLength = 0;
+ int runLength = 0;
+ int prevValueLength = -1;
+
+ ColumnBuffer() throws IOException {
+ columnValBuffer = new NonSyncByteArrayOutputStream();
+ valLenBuffer = new NonSyncByteArrayOutputStream();
+ }
+
+ public int append(Column column, Datum datum) throws IOException {
+ int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
+ columnValueLength += currentLen;
+ uncompressedColumnValueLength += currentLen;
+
+ if (prevValueLength < 0) {
+ startNewGroup(currentLen);
+ return currentLen;
+ }
+
+ if (currentLen != prevValueLength) {
+ flushGroup();
+ startNewGroup(currentLen);
+ } else {
+ runLength++;
+ }
+ return currentLen;
+ }
+
+ private void startNewGroup(int currentLen) {
+ prevValueLength = currentLen;
+ runLength = 0;
+ }
+
+ public void clear() {
+ valLenBuffer.reset();
+ columnValBuffer.reset();
+ prevValueLength = -1;
+ runLength = 0;
+ columnValueLength = 0;
+ columnKeyLength = 0;
+ uncompressedColumnValueLength = 0;
+ }
+
+ public int flushGroup() {
+ int len = 0;
+ if (prevValueLength >= 0) {
+ len += valLenBuffer.writeVLong(prevValueLength);
+ if (runLength > 0) {
+ len += valLenBuffer.writeVLong(~runLength);
+ }
+ columnKeyLength += len;
+ runLength = -1;
+ prevValueLength = -1;
+ }
+ return len;
+ }
+
+ public int UnFlushedGroupSize() {
+ int len = 0;
+ if (prevValueLength >= 0) {
+ len += WritableUtils.getVIntSize(prevValueLength);
+ if (runLength > 0) {
+ len += WritableUtils.getVIntSize(~runLength);
+ }
+ }
+ return len;
+ }
+ }
+
+ public long getLength() throws IOException {
+ return out.getPos();
+ }
+
+ public RCFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+
+ RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+ COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE);
+ columnNumber = schema.size();
+ }
+
+ public void init() throws IOException {
+ fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ //determine the intermediate file type
+ String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+ TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+ if (enabledStats && CatalogProtos.StoreType.RCFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+ isShuffle = true;
+ } else {
+ isShuffle = false;
+ }
+
+ if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+ String codecClassname = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+ try {
+ Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+ codecClassname).asSubclass(CompressionCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException(
+ "Unknown codec: " + codecClassname, cnfe);
+ }
+ }
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.RCFILE_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ if (metadata == null) {
+ metadata = new Metadata();
+ }
+
+ metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
+
+ String serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE,
+ BinarySerializerDeserializer.class.getName());
+ try {
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+ metadata.set(new Text(StorageConstants.RCFILE_SERDE), new Text(serdeClass));
+
+ columnBuffers = new ColumnBuffer[columnNumber];
+ for (int i = 0; i < columnNumber; i++) {
+ columnBuffers[i] = new ColumnBuffer();
+ }
+
+ init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+ super.init();
+ }
+
+ /**
+ * Write the initial part of file header.
+ */
+ void initializeFileHeader() throws IOException {
+ if (useNewMagic) {
+ out.write(MAGIC);
+ out.write(CURRENT_VERSION);
+ } else {
+ out.write(ORIGINAL_MAGIC_VERSION);
+ }
+ }
+
+ /**
+ * Write the final part of file header.
+ */
+ void finalizeFileHeader() throws IOException {
+ out.write(sync); // write the sync bytes
+ out.flush(); // flush header
+ }
+
+ boolean isCompressed() {
+ return codec != null;
+ }
+
+ /**
+ * Write and flush the file header.
+ */
+ void writeFileHeader() throws IOException {
+ if (useNewMagic) {
+ out.writeBoolean(isCompressed());
+ } else {
+ Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer");
+ Text.writeString(out, "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer");
+ out.writeBoolean(isCompressed());
+ out.writeBoolean(false);
+ }
+
+ if (isCompressed()) {
+ Text.writeString(out, (codec.getClass()).getName());
+ }
+ metadata.write(out);
+ }
+
+ void init(Configuration conf, FSDataOutputStream out,
+ CompressionCodec codec, Metadata metadata) throws IOException {
+ this.out = out;
+ this.codec = codec;
+ this.metadata = metadata;
+ this.useNewMagic = conf.getBoolean(TajoConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
+ }
+
+ /**
+ * create a sync point.
+ */
+ public void sync() throws IOException {
+ if (sync != null && lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+ out.write(sync); // write sync
+ lastSyncPos = out.getPos(); // update lastSyncPos
+ }
+ }
+
+ private void checkAndWriteSync() throws IOException {
+ if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+ sync();
+ }
+ }
+
+ private int columnBufferSize = 0;
+
+ @Override
+ public long getOffset() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushRecords();
+ out.flush();
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ append(t);
+ // Statistical section
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ /**
+ * Append a row of values. Currently it only can accept <
+ * {@link org.apache.tajo.storage.Tuple}. If its <code>size()</code> is less than the
+ * column number in the file, zero bytes are appended for the empty columns.
+ * If its size() is greater then the column number in the file, the exceeded
+ * columns' bytes are ignored.
+ *
+ * @param tuple a Tuple with the list of serialized columns
+ * @throws java.io.IOException
+ */
+ public void append(Tuple tuple) throws IOException {
+ int size = schema.size();
+
+ for (int i = 0; i < size; i++) {
+ Datum datum = tuple.get(i);
+ int length = columnBuffers[i].append(schema.getColumn(i), datum);
+ columnBufferSize += length;
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
+ stats.analyzeField(i, datum);
+ }
+ }
+
+ if (size < columnNumber) {
+ for (int i = size; i < columnNumber; i++) {
+ columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
+ if (isShuffle) {
+ stats.analyzeField(i, NullDatum.get());
+ }
+ }
+ }
+
+ bufferedRecords++;
+ //TODO compression rate base flush
+ if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
+ || (bufferedRecords >= RECORD_INTERVAL)) {
+ flushRecords();
+ }
+ }
+
+ /**
+ * get number of bytes to store the keyBuffer.
+ *
+ * @return number of bytes used to store this KeyBuffer on disk
+ * @throws java.io.IOException
+ */
+ public int getKeyBufferSize() throws IOException {
+ int ret = 0;
+ ret += WritableUtils.getVIntSize(bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+ ret += currentBuf.columnKeyLength;
+ }
+
+ return ret;
+ }
+
+ /**
+ * get number of bytes to store the key part.
+ *
+ * @return number of bytes used to store this Key part on disk
+ * @throws java.io.IOException
+ */
+ public int getKeyPartSize() throws IOException {
+ int ret = 12; //12 bytes |record count, key length, compressed key length|
+
+ ret += WritableUtils.getVIntSize(bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ ret += WritableUtils.getVIntSize(currentBuf.columnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.uncompressedColumnValueLength);
+ ret += WritableUtils.getVIntSize(currentBuf.columnKeyLength);
+ ret += currentBuf.columnKeyLength;
+ ret += currentBuf.UnFlushedGroupSize();
+ }
+
+ return ret;
+ }
+
+ private void WriteKeyBuffer(DataOutputStream out) throws IOException {
+ WritableUtils.writeVLong(out, bufferedRecords);
+ for (int i = 0; i < columnBuffers.length; i++) {
+ ColumnBuffer currentBuf = columnBuffers[i];
+ WritableUtils.writeVLong(out, currentBuf.columnValueLength);
+ WritableUtils.writeVLong(out, currentBuf.uncompressedColumnValueLength);
+ WritableUtils.writeVLong(out, currentBuf.columnKeyLength);
+ currentBuf.valLenBuffer.writeTo(out);
+ }
+ }
+
+ private void flushRecords() throws IOException {
+
+ Compressor compressor = null;
+ NonSyncByteArrayOutputStream valueBuffer = null;
+ CompressionOutputStream deflateFilter = null;
+ DataOutputStream deflateOut = null;
+ boolean isCompressed = isCompressed();
+
+ int valueLength = 0;
+ if (isCompressed) {
+ compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ valueBuffer = new NonSyncByteArrayOutputStream();
+ deflateFilter = codec.createOutputStream(valueBuffer, compressor);
+ deflateOut = new DataOutputStream(deflateFilter);
+ }
+
+ try {
+ for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+ ColumnBuffer currentBuf = columnBuffers[columnIndex];
+ currentBuf.flushGroup();
+
+ NonSyncByteArrayOutputStream columnValue = currentBuf.columnValBuffer;
+ int colLen;
+ int plainLen = columnValue.getLength();
+ if (isCompressed) {
+ deflateFilter.resetState();
+ deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+ columnValue.close();
+ // find how much compressed data was added for this column
+ colLen = valueBuffer.getLength() - valueLength;
+ currentBuf.columnValueLength = colLen;
+ } else {
+ colLen = plainLen;
+ }
+ valueLength += colLen;
+ }
+ } catch (IOException e) {
+ IOUtils.cleanup(LOG, deflateOut, out);
+ throw e;
+ }
+
+ if (compressor != null) {
+ org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
+ }
+
+ int keyLength = getKeyBufferSize();
+ if (keyLength < 0) {
+ throw new IOException("negative length keys not allowed: " + keyLength);
+ }
+ // Write the key out
+ writeKey(keyLength + valueLength, keyLength);
+ // write the value out
+ if (isCompressed) {
+ try {
+ out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+ } finally {
+ IOUtils.cleanup(LOG, valueBuffer);
+ }
+ } else {
+ for (int columnIndex = 0; columnIndex < columnNumber; ++columnIndex) {
+ columnBuffers[columnIndex].columnValBuffer.writeTo(out);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column#" + columnIndex + " : Plain Total Column Value Length: "
+ + columnBuffers[columnIndex].uncompressedColumnValueLength
+ + ", Compr Total Column Value Length: " + columnBuffers[columnIndex].columnValueLength);
+ }
+ }
+ }
+ // clear the columnBuffers
+ clearColumnBuffers();
+
+ bufferedRecords = 0;
+ columnBufferSize = 0;
+ }
+
+ private void writeKey(int recordLen, int keyLength) throws IOException {
+ checkAndWriteSync(); // sync
+ out.writeInt(recordLen); // total record length
+ out.writeInt(keyLength); // key portion length
+
+ if (this.isCompressed()) {
+ Compressor compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ NonSyncByteArrayOutputStream compressionBuffer = new NonSyncByteArrayOutputStream();
+ CompressionOutputStream deflateFilter = codec.createOutputStream(compressionBuffer, compressor);
+ DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
+
+ //compress key and write key out
+ compressionBuffer.reset();
+ deflateFilter.resetState();
+ WriteKeyBuffer(deflateOut);
+ deflateOut.flush();
+ deflateFilter.finish();
+ int compressedKeyLen = compressionBuffer.getLength();
+ out.writeInt(compressedKeyLen);
+ compressionBuffer.writeTo(out);
+ compressionBuffer.reset();
+ deflateOut.close();
+ org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
+ } else {
+ out.writeInt(keyLength);
+ WriteKeyBuffer(out);
+ }
+ }
+
+ private void clearColumnBuffers() throws IOException {
+ for (int i = 0; i < columnNumber; i++) {
+ columnBuffers[i].clear();
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (bufferedRecords > 0) {
+ flushRecords();
+ }
+ clearColumnBuffers();
+
+ if (out != null) {
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+ // Close the underlying stream if we own it...
+ out.flush();
+ IOUtils.cleanup(LOG, out);
+ out = null;
+ }
+ }
+ }
+
+ /**
+ * Read KeyBuffer/ValueBuffer pairs from a RCFile.
+ */
+ public static class RCFileScanner extends FileScanner {
+ private static class SelectedColumn {
+ public int colIndex;
+ public int rowReadIndex;
+ public int runLength;
+ public int prvLength;
+ public boolean isNulled;
+ }
+
+ private FSDataInputStream in;
+
+ private byte version;
+
+ private CompressionCodec codec = null;
+ private Metadata metadata = null;
+
+ private byte[] sync;
+ private byte[] syncCheck;
+ private boolean syncSeen;
+ private long lastSeenSyncPos = 0;
+
+ private long headerEnd;
+ private long start, end;
+ private final long startOffset, endOffset;
+ private int[] targetColumnIndexes;
+
+ private int currentKeyLength;
+ private int currentRecordLength;
+
+ private ValueBuffer currentValue;
+
+ private int readRowsIndexInBuffer = 0;
+
+ private int recordsNumInValBuffer = 0;
+
+ private int columnNumber = 0;
+
+ private boolean more = true;
+
+ private int passedRowsNum = 0;
+
+ private boolean decompress = false;
+
+ private Decompressor keyDecompressor;
+
+ private long readBytes = 0;
+
+ //Current state of each selected column - e.g. current run length, etc.
+ // The size of the array is equal to the number of selected columns
+ private SelectedColumn[] selectedColumns;
+
+ // column value lengths for each of the selected columns
+ private NonSyncDataInputBuffer[] colValLenBufferReadIn;
+
+ private LongWritable rowId;
+ private byte[] nullChars;
+ private SerializerDeserializer serde;
+
+ public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ super(conf, schema, meta, fragment);
+ conf.setInt("io.file.buffer.size", 4096); //TODO remove
+
+ startOffset = this.fragment.getStartKey();
+ endOffset = startOffset + this.fragment.getLength();
+ start = 0;
+ }
+
+ @Override
+ public void init() throws IOException {
+ sync = new byte[SYNC_HASH_SIZE];
+ syncCheck = new byte[SYNC_HASH_SIZE];
+
+ more = startOffset < endOffset;
+ rowId = new LongWritable();
+ readBytes = 0;
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.RCFILE_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ // projection
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
+ }
+ Arrays.sort(targetColumnIndexes);
+
+ FileSystem fs = fragment.getPath().getFileSystem(conf);
+ end = fs.getFileStatus(fragment.getPath()).getLen();
+ in = openFile(fs, fragment.getPath(), 4096);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RCFile open:" + fragment.getPath() + "," + start + "," + (endOffset - startOffset) +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+ //init RCFILE Header
+ boolean succeed = false;
+ try {
+ if (start > 0) {
+ seek(0);
+ initHeader();
+ } else {
+ initHeader();
+ }
+ succeed = true;
+ } finally {
+ if (!succeed) {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ if (LOG != null && LOG.isDebugEnabled()) {
+ LOG.debug("Exception in closing " + in, e);
+ }
+ }
+ }
+ }
+ }
+
+ columnNumber = Integer.parseInt(metadata.get(new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+ selectedColumns = new SelectedColumn[targetColumnIndexes.length];
+ colValLenBufferReadIn = new NonSyncDataInputBuffer[targetColumnIndexes.length];
+ boolean[] skippedColIDs = new boolean[columnNumber];
+ Arrays.fill(skippedColIDs, true);
+ super.init();
+
+ for (int i = 0; i < targetColumnIndexes.length; i++) {
+ int tid = targetColumnIndexes[i];
+ if (tid < columnNumber) {
+ skippedColIDs[tid] = false;
+
+ SelectedColumn col = new SelectedColumn();
+ col.colIndex = tid;
+ col.runLength = 0;
+ col.prvLength = -1;
+ col.rowReadIndex = 0;
+ selectedColumns[i] = col;
+ colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+ }
+ }
+
+ currentKey = createKeyBuffer();
+ currentValue = new ValueBuffer(null, columnNumber, targetColumnIndexes, codec, skippedColIDs);
+
+ if (startOffset > getPosition()) { // TODO use sync cache
+ sync(startOffset); // sync to start
+ }
+ }
+
+ /**
+ * Return the metadata (Text to Text map) that was written into the
+ * file.
+ */
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Return the metadata value associated with the given key.
+ *
+ * @param key the metadata key to retrieve
+ */
+ public Text getMetadataValueOf(Text key) {
+ return metadata.get(key);
+ }
+
+ /**
+ * Override this method to specialize the type of
+ * {@link org.apache.hadoop.fs.FSDataInputStream} returned.
+ */
+ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize) throws IOException {
+ return fs.open(file, bufferSize);
+ }
+
+ private void initHeader() throws IOException {
+ byte[] magic = new byte[MAGIC.length];
+ in.readFully(magic);
+
+ if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
+ byte vers = in.readByte();
+ if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
+ throw new IOException(fragment.getPath() + " is a version " + vers +
+ " SequenceFile instead of an RCFile.");
+ }
+ version = ORIGINAL_VERSION;
+ } else {
+ if (!Arrays.equals(magic, MAGIC)) {
+ throw new IOException(fragment.getPath() + " not a RCFile and has magic of " +
+ new String(magic));
+ }
+
+ // Set 'version'
+ version = in.readByte();
+ if (version > CURRENT_VERSION) {
+ throw new VersionMismatchException((byte) CURRENT_VERSION, version);
+ }
+ }
+
+ if (version == ORIGINAL_VERSION) {
+ try {
+ Class<?> keyCls = conf.getClassByName(Text.readString(in));
+ Class<?> valCls = conf.getClassByName(Text.readString(in));
+ if (!keyCls.equals(KeyBuffer.class)
+ || !valCls.equals(ValueBuffer.class)) {
+ throw new IOException(fragment.getPath() + " not a RCFile");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(fragment.getPath() + " not a RCFile", e);
+ }
+ }
+
+ decompress = in.readBoolean(); // is compressed?
+
+ if (version == ORIGINAL_VERSION) {
+ // is block-compressed? it should be always false.
+ boolean blkCompressed = in.readBoolean();
+ if (blkCompressed) {
+ throw new IOException(fragment.getPath() + " not a RCFile.");
+ }
+ }
+
+ // setup the compression codec
+ if (decompress) {
+ String codecClassname = Text.readString(in);
+ try {
+ Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+ codecClassname).asSubclass(CompressionCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException(
+ "Unknown codec: " + codecClassname, cnfe);
+ }
+
+ keyDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
+ }
+
+ metadata = new Metadata();
+ metadata.readFields(in);
+
+ Text text = metadata.get(new Text(StorageConstants.RCFILE_SERDE));
+
+ try {
+ String serdeClass;
+ if(text != null && !text.toString().isEmpty()){
+ serdeClass = text.toString();
+ } else{
+ serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+ }
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ in.readFully(sync); // read sync bytes
+ headerEnd = in.getPos();
+ lastSeenSyncPos = headerEnd; //initial sync position
+ readBytes += headerEnd;
+ }
+
+ /**
+ * Return the current byte position in the input file.
+ */
+ public long getPosition() throws IOException {
+ return in.getPos();
+ }
+
+ /**
+ * Set the current byte position in the input file.
+ * <p/>
+ * <p/>
+ * The position passed must be a position returned by
+ * {@link org.apache.tajo.storage.rcfile.RCFile.RCFileAppender#getLength()} when writing this file. To seek to an
+ * arbitrary position, use {@link org.apache.tajo.storage.rcfile.RCFile.RCFileScanner#sync(long)}. In another
+ * words, the current seek can only seek to the end of the file. For other
+ * positions, use {@link org.apache.tajo.storage.rcfile.RCFile.RCFileScanner#sync(long)}.
+ */
+ public void seek(long position) throws IOException {
+ in.seek(position);
+ }
+
+ /**
+ * Resets the values which determine if there are more rows in the buffer
+ * <p/>
+ * This can be used after one calls seek or sync, if one called next before that.
+ * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
+ * buffer built up from the call to next.
+ */
+ public void resetBuffer() {
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = 0;
+ }
+
+ /**
+ * Seek to the next sync mark past a given position.
+ */
+ public void sync(long position) throws IOException {
+ if (position + SYNC_SIZE >= end) {
+ seek(end);
+ return;
+ }
+
+ //this is to handle syn(pos) where pos < headerEnd.
+ if (position < headerEnd) {
+ // seek directly to first record
+ in.seek(headerEnd);
+ // note the sync marker "seen" in the header
+ syncSeen = true;
+ return;
+ }
+
+ try {
+ seek(position + 4); // skip escape
+
+ int prefix = sync.length;
+ int n = conf.getInt("io.bytes.per.checksum", 512);
+ byte[] buffer = new byte[prefix + n];
+ n = (int) Math.min(n, end - in.getPos());
+ /* fill array with a pattern that will never match sync */
+ Arrays.fill(buffer, (byte) (~sync[0]));
+ while (n > 0 && (in.getPos() + n) <= end) {
+ position = in.getPos();
+ in.readFully(buffer, prefix, n);
+ readBytes += n;
+ /* the buffer has n+sync bytes */
+ for (int i = 0; i < n; i++) {
+ int j;
+ for (j = 0; j < sync.length && sync[j] == buffer[i + j]; j++) {
+ /* nothing */
+ }
+ if (j == sync.length) {
+ /* simplified from (position + (i - prefix) + sync.length) - SYNC_SIZE */
+ in.seek(position + i - SYNC_SIZE);
+ return;
+ }
+ }
+ /* move the last 16 bytes to the prefix area */
+ System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
+ n = (int) Math.min(n, end - in.getPos());
+ }
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ }
+ }
+
+ private void handleChecksumException(ChecksumException e) throws IOException {
+ if (conf.getBoolean("io.skip.checksum.errors", false)) {
+ LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+ sync(getPosition() + conf.getInt("io.bytes.per.checksum", 512));
+ } else {
+ throw e;
+ }
+ }
+
+ private KeyBuffer createKeyBuffer() {
+ return new KeyBuffer(columnNumber);
+ }
+
+ /**
+ * Read and return the next record length, potentially skipping over a sync
+ * block.
+ *
+ * @return the length of the next record or -1 if there is no next record
+ * @throws java.io.IOException
+ */
+ private int readRecordLength() throws IOException {
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ int length = in.readInt();
+ readBytes += 4;
+ if (sync != null && length == SYNC_ESCAPE) { // process
+ // a
+ // sync entry
+ lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
+ in.readFully(syncCheck); // read syncCheck
+ readBytes += SYNC_HASH_SIZE;
+ if (!Arrays.equals(sync, syncCheck)) {
+ throw new IOException("File is corrupt!");
+ }
+ syncSeen = true;
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ length = in.readInt(); // re-read length
+ readBytes += 4;
+ } else {
+ syncSeen = false;
+ }
+ return length;
+ }
+
+ private void seekToNextKeyBuffer() throws IOException {
+ if (!keyInit) {
+ return;
+ }
+ if (!currentValue.inited) {
+ IOUtils.skipFully(in, currentRecordLength - currentKeyLength);
+ }
+ }
+
+ private int compressedKeyLen = 0;
+ NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
+ NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
+
+ KeyBuffer currentKey = null;
+ boolean keyInit = false;
+
+ protected int nextKeyBuffer() throws IOException {
+ seekToNextKeyBuffer();
+ currentRecordLength = readRecordLength();
+ if (currentRecordLength == -1) {
+ keyInit = false;
+ return -1;
+ }
+ currentKeyLength = in.readInt();
+ compressedKeyLen = in.readInt();
+ readBytes += 8;
+ if (decompress) {
+
+ byte[] compressedBytes = new byte[compressedKeyLen];
+ in.readFully(compressedBytes, 0, compressedKeyLen);
+
+ if (keyDecompressor != null) keyDecompressor.reset();
+ keyDecompressBuffer.reset(compressedBytes, compressedKeyLen);
+
+ DataInputStream is;
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream deflatFilter = ((SplittableCompressionCodec) codec).createInputStream(
+ keyDecompressBuffer, keyDecompressor, 0, compressedKeyLen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ keyDecompressBuffer.seek(deflatFilter.getAdjustedStart());
+ is = new DataInputStream(deflatFilter);
+ } else {
+ CompressionInputStream deflatFilter = codec.createInputStream(keyDecompressBuffer, keyDecompressor);
+ is = new DataInputStream(deflatFilter);
+ }
+
+ byte[] deCompressedBytes = new byte[currentKeyLength];
+
+ is.readFully(deCompressedBytes, 0, currentKeyLength);
+ keyDataIn.reset(deCompressedBytes, currentKeyLength);
+ currentKey.readFields(keyDataIn);
+ is.close();
+ } else {
+ currentKey.readFields(in);
+ }
+ readBytes += currentKeyLength;
+ keyInit = true;
+ currentValue.inited = false;
+
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = currentKey.numberRows;
+
+ for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
+ SelectedColumn col = selectedColumns[selIx];
+ if (col == null) {
+ col = new SelectedColumn();
+ col.isNulled = true;
+ selectedColumns[selIx] = col;
+ continue;
+ }
+
+ int colIx = col.colIndex;
+ NonSyncByteArrayOutputStream buf = currentKey.allCellValLenBuffer[colIx];
+ colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
+ col.rowReadIndex = 0;
+ col.runLength = 0;
+ col.prvLength = -1;
+ col.isNulled = buf.getLength() == 0;
+ }
+
+ return currentKeyLength;
+ }
+
+ protected void currentValueBuffer() throws IOException {
+ if (!keyInit) {
+ nextKeyBuffer();
+ }
+ currentValue.keyBuffer = currentKey;
+ currentValue.clearColumnBuffer();
+ currentValue.readFields(in);
+ currentValue.inited = true;
+ readBytes += currentValue.getReadBytes();
+
+ if (tableStats != null) {
+ tableStats.setReadBytes(readBytes);
+ tableStats.setNumRows(passedRowsNum);
+ }
+ }
+
+ private boolean rowFetched = false;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (!more) {
+ return null;
+ }
+
+ more = nextBuffer(rowId);
+ long lastSeenSyncPos = lastSeenSyncPos();
+ if (lastSeenSyncPos >= endOffset) {
+ more = false;
+ return null;
+ }
+
+ if (!more) {
+ return null;
+ }
+
+ Tuple tuple = new VTuple(schema.size());
+ getCurrentRow(tuple);
+ return tuple;
+ }
+
+ @Override
+ public float getProgress() {
+ try {
+ if(!more) {
+ return 1.0f;
+ }
+ long filePos = getPosition();
+ if (startOffset == filePos) {
+ return 0.0f;
+ } else {
+ //if scanner read the header, filePos moved to zero
+ return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getLength()));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+
+ /**
+ * Returns how many rows we fetched with nextBuffer(). It only means how many rows
+ * are read by nextBuffer(). The returned result may be smaller than actual number
+ * of rows passed by, because {@link #seek(long)} can change the underlying key buffer and
+ * value buffer.
+ *
+ * @return next row number
+ * @throws java.io.IOException
+ */
+ public boolean nextBuffer(LongWritable readRows) throws IOException {
+ if (readRowsIndexInBuffer < recordsNumInValBuffer) {
+ readRows.set(passedRowsNum);
+ readRowsIndexInBuffer++;
+ passedRowsNum++;
+ rowFetched = false;
+ return true;
+ } else {
+ keyInit = false;
+ }
+
+ int ret = -1;
+ try {
+ ret = nextKeyBuffer();
+ } catch (EOFException eof) {
+ eof.printStackTrace();
+ }
+ return (ret > 0) && nextBuffer(readRows);
+ }
+
+ /**
+ * get the current row used,make sure called {@link #next()}
+ * first.
+ *
+ * @throws java.io.IOException
+ */
+ public void getCurrentRow(Tuple tuple) throws IOException {
+ if (!keyInit || rowFetched) {
+ return;
+ }
+
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ }
+
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+
+ if (col.isNulled) {
+ tuple.put(i, NullDatum.get());
+ } else {
+ colAdvanceRow(j, col);
+
+ Datum datum = serde.deserialize(schema.getColumn(i),
+ currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
+ tuple.put(i, datum);
+ col.rowReadIndex += col.prvLength;
+ }
+ }
+ rowFetched = true;
+ }
+
+ /**
+ * Advance column state to the next now: update offsets, run lengths etc
+ *
+ * @param selCol - index among selectedColumns
+ * @param col - column object to update the state of. prvLength will be
+ * set to the new read position
+ * @throws java.io.IOException
+ */
+ private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
+ if (col.runLength > 0) {
+ --col.runLength;
+ } else {
+ int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
+ if (length < 0) {
+ // we reach a runlength here, use the previous length and reset
+ // runlength
+ col.runLength = (~length) - 1;
+ } else {
+ col.prvLength = length;
+ col.runLength = 0;
+ }
+ }
+ }
+
+ /**
+ * Returns true if the previous call to next passed a sync mark.
+ */
+ public boolean syncSeen() {
+ return syncSeen;
+ }
+
+ /**
+ * Returns the last seen sync position.
+ */
+ public long lastSeenSyncPos() {
+ return lastSeenSyncPos;
+ }
+
+ /**
+ * Returns the name of the file.
+ */
+ @Override
+ public String toString() {
+ return fragment.getPath().toString();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ seek(startOffset);
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tableStats != null) {
+ tableStats.setReadBytes(readBytes); //Actual Processed Bytes. (decompressed bytes + header - seek)
+ tableStats.setNumRows(passedRowsNum);
+ }
+
+ IOUtils.cleanup(LOG, in, currentValue);
+ if (keyDecompressor != null) {
+ // Make sure we only return decompressor once.
+ org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
+ keyDecompressor = null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
new file mode 100644
index 0000000..60f1b06
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionInputStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.InputStream;
+
+/**
+ *
+ * SchemaAwareCompressionInputStream adds the ability to inform the compression
+ * stream what column is being read.
+ *
+ */
+public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
+
+ protected SchemaAwareCompressionInputStream(InputStream in) throws java.io.IOException {
+ super(in);
+ }
+
+ /**
+ * The column being read
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
new file mode 100644
index 0000000..c0ce8b3
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/SchemaAwareCompressionOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rcfile;
+
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+
+import java.io.OutputStream;
+
+/**
+ *
+ * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream
+ * the current column being compressed.
+ *
+ */
+public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream {
+
+ protected SchemaAwareCompressionOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ /**
+ *
+ * The column being output
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
new file mode 100644
index 0000000..14e0f26
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.sequencefile;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class SequenceFileAppender extends FileAppender {
+ private static final Log LOG = LogFactory.getLog(SequenceFileScanner.class);
+
+ private SequenceFile.Writer writer;
+
+ private TableMeta meta;
+ private Schema schema;
+ private TableStatistics stats = null;
+
+ private int columnNum;
+ private FileSystem fs;
+ private char delimiter;
+ private byte[] nullChars;
+
+ private final static int BUFFER_SIZE = 128 * 1024;
+ private long pos = 0;
+
+ private CompressionCodecFactory codecFactory;
+ private CompressionCodec codec;
+
+ private NonSyncByteArrayOutputStream os;
+ private SerializerDeserializer serde;
+
+ long rowCount;
+ private boolean isShuffle;
+
+ private Writable EMPTY_KEY;
+
+ public SequenceFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ this.meta = meta;
+ this.schema = schema;
+ }
+
+ @Override
+ public void init() throws IOException {
+ os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+
+ this.fs = path.getFileSystem(conf);
+
+ //determine the intermediate file type
+ String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+ TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+ if (enabledStats && CatalogProtos.StoreType.SEQUENCEFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+ isShuffle = true;
+ } else {
+ isShuffle = false;
+ }
+
+ this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+ this.columnNum = schema.size();
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+ String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+ codecFactory = new CompressionCodecFactory(conf);
+ codec = codecFactory.getCodecByClassName(codecName);
+ } else {
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+ }
+
+ try {
+ String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE,
+ TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ Class<? extends Writable> keyClass, valueClass;
+ if (serde instanceof BinarySerializerDeserializer) {
+ keyClass = BytesWritable.class;
+ EMPTY_KEY = new BytesWritable();
+ valueClass = BytesWritable.class;
+ } else {
+ keyClass = LongWritable.class;
+ EMPTY_KEY = new LongWritable();
+ valueClass = Text.class;
+ }
+
+ String type = this.meta.getOption(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name());
+ if (type.equals(CompressionType.BLOCK.name())) {
+ writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec);
+ } else if (type.equals(CompressionType.RECORD.name())) {
+ writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec);
+ } else {
+ writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec);
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ super.init();
+ }
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+
+ if (serde instanceof BinarySerializerDeserializer) {
+ byte nullByte = 0;
+ int lasti = 0;
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+
+ // set bit to 1 if a field is not null
+ if (null != datum) {
+ nullByte |= 1 << (i % 8);
+ }
+
+ // write the null byte every eight elements or
+ // if this is the last element and serialize the
+ // corresponding 8 struct fields at the same time
+ if (7 == i % 8 || i == columnNum - 1) {
+ os.write(nullByte);
+
+ for (int j = lasti; j <= i; j++) {
+ datum = tuple.get(j);
+
+ switch (schema.getColumn(j).getDataType().getType()) {
+ case TEXT:
+ BytesUtils.writeVLong(os, datum.asTextBytes().length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ BytesUtils.writeVLong(os, protobufDatum.asByteArray().length);
+ break;
+ case CHAR:
+ case INET4:
+ case BLOB:
+ BytesUtils.writeVLong(os, datum.asByteArray().length);
+ break;
+ default:
+ }
+
+ serde.serialize(schema.getColumn(j), datum, os, nullChars);
+
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
+ stats.analyzeField(j, datum);
+ }
+ }
+ lasti = i + 1;
+ nullByte = 0;
+ }
+ }
+
+ BytesWritable b = new BytesWritable();
+ b.set(os.getData(), 0, os.getLength());
+ writer.append(EMPTY_KEY, b);
+
+ } else {
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ serde.serialize(schema.getColumn(i), datum, os, nullChars);
+
+ if (columnNum -1 > i) {
+ os.write((byte) delimiter);
+ }
+
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
+ stats.analyzeField(i, datum);
+ }
+
+ }
+ writer.append(EMPTY_KEY, new Text(os.toByteArray()));
+ }
+
+ os.reset();
+ pos += writer.getLength();
+ rowCount++;
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ os.flush();
+ writer.close();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
+ os.close();
+ writer.close();
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
new file mode 100644
index 0000000..74563ff
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.sequencefile;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.IOException;
+
+public class SequenceFileScanner extends FileScanner {
+ private static final Log LOG = LogFactory.getLog(SequenceFileScanner.class);
+
+ private FileSystem fs;
+ private SequenceFile.Reader reader;
+ private SerializerDeserializer serde;
+ private byte[] nullChars;
+ private char delimiter;
+
+ private int currentIdx = 0;
+ private int[] projectionMap;
+
+ private boolean hasBinarySerDe = false;
+ private long totalBytes = 0L;
+
+ private long start, end;
+ private boolean more = true;
+
+ /**
+ * Whether a field is null or not. Because length is 0 does not means the
+ * field is null. In particular, a 0-length string is not null.
+ */
+ private boolean[] fieldIsNull;
+
+ /**
+ * The start positions and lengths of fields. Only valid when the data is parsed.
+ */
+ private int[] fieldStart;
+ private int[] fieldLength;
+
+ private int elementOffset, elementSize;
+
+ private Writable EMPTY_KEY;
+
+ public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
+ super(conf, schema, meta, fragment);
+ }
+
+ @Override
+ public void init() throws IOException {
+ // FileFragment information
+ if(fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
+ }
+
+ reader = new SequenceFile.Reader(fs, fragment.getPath(), conf);
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.SEQUENCEFILE_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+
+ String delim = meta.getOption(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
+
+ this.start = fragment.getStartKey();
+ this.end = start + fragment.getLength();
+
+ if (fragment.getStartKey() > reader.getPosition())
+ reader.sync(this.start);
+
+ more = start < end;
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+
+ fieldIsNull = new boolean[schema.getColumns().size()];
+ fieldStart = new int[schema.getColumns().size()];
+ fieldLength = new int[schema.getColumns().size()];
+
+ prepareProjection(targets);
+
+ try {
+ String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+
+ if (serde instanceof BinarySerializerDeserializer) {
+ hasBinarySerDe = true;
+ }
+
+ Class<? extends Writable> keyClass = (Class<? extends Writable>)Class.forName(reader.getKeyClassName());
+ EMPTY_KEY = keyClass.newInstance();
+
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+ super.init();
+ }
+
+ public Writable getKey() {
+ return EMPTY_KEY;
+ }
+
+ private void prepareProjection(Column [] targets) {
+ projectionMap = new int[targets.length];
+
+ int tid;
+ for (int i = 0; i < targets.length; i++) {
+ tid = schema.getColumnId(targets[i].getQualifiedName());
+ projectionMap[i] = tid;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (!more) return null;
+
+ long pos = reader.getPosition();
+ boolean remaining = reader.next(EMPTY_KEY);
+
+ if (pos >= end && reader.syncSeen()) {
+ more = false;
+ } else {
+ more = remaining;
+ }
+
+ if (more) {
+ Tuple tuple = null;
+ byte[][] cells;
+
+ if (hasBinarySerDe) {
+ BytesWritable bytesWritable = new BytesWritable();
+ reader.getCurrentValue(bytesWritable);
+ tuple = makeTuple(bytesWritable);
+ totalBytes += (long)bytesWritable.getBytes().length;
+ } else {
+ Text text = new Text();
+ reader.getCurrentValue(text);
+ cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), delimiter, projectionMap);
+ totalBytes += (long)text.getBytes().length;
+ tuple = new LazyTuple(schema, cells, 0, nullChars, serde);
+ }
+ currentIdx++;
+ return tuple;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * In hive, LazyBinarySerDe is serialized as follows: start A B A B A B end bytes[] ->
+ * |-----|---------|--- ... ---|-----|---------|
+ *
+ * Section A is one null-byte, corresponding to eight struct fields in Section
+ * B. Each bit indicates whether the corresponding field is null (0) or not null
+ * (1). Each field is a LazyBinaryObject.
+ *
+ * Following B, there is another section A and B. This pattern repeats until the
+ * all struct fields are serialized.
+ *
+ * So, tajo must make a tuple after parsing hive style BinarySerDe.
+ */
+ private Tuple makeTuple(BytesWritable value) throws IOException{
+ Tuple tuple = new VTuple(schema.getColumns().size());
+
+ int start = 0;
+ int length = value.getLength();
+
+ /**
+ * Please note that one null byte is followed by eight fields, then more
+ * null byte and fields.
+ */
+ int structByteEnd = start + length;
+ byte[] bytes = value.getBytes();
+
+ byte nullByte = bytes[start];
+ int lastFieldByteEnd = start + 1;
+
+ // Go through all bytes in the byte[]
+ for (int i = 0; i < schema.getColumns().size(); i++) {
+ fieldIsNull[i] = true;
+ if ((nullByte & (1 << (i % 8))) != 0) {
+ fieldIsNull[i] = false;
+ parse(schema.getColumn(i), bytes, lastFieldByteEnd);
+
+ fieldStart[i] = lastFieldByteEnd + elementOffset;
+ fieldLength[i] = elementSize;
+ lastFieldByteEnd = fieldStart[i] + fieldLength[i];
+
+ for (int j = 0; j < projectionMap.length; j++) {
+ if (projectionMap[j] == i) {
+ Datum datum = serde.deserialize(schema.getColumn(i), bytes, fieldStart[i], fieldLength[i], nullChars);
+ tuple.put(i, datum);
+ }
+ }
+ }
+
+ // next byte is a null byte if there are more bytes to go
+ if (7 == (i % 8)) {
+ if (lastFieldByteEnd < structByteEnd) {
+ nullByte = bytes[lastFieldByteEnd];
+ lastFieldByteEnd++;
+ } else {
+ // otherwise all null afterwards
+ nullByte = 0;
+ lastFieldByteEnd++;
+ }
+ }
+ }
+
+ return tuple;
+ }
+
+ /**
+ * Check a particular field and set its size and offset in bytes based on the
+ * field type and the bytes arrays.
+ *
+ * For void, boolean, byte, short, int, long, float and double, there is no
+ * offset and the size is fixed. For string, the first four bytes are used to store the size.
+ * So the offset is 4 and the size is computed by concating the first four bytes together.
+ * The first four bytes are defined with respect to the offset in the bytes arrays.
+ *
+ * @param col
+ * catalog column information
+ * @param bytes
+ * bytes arrays store the table row
+ * @param offset
+ * offset of this field
+ */
+ private void parse(Column col, byte[] bytes, int offset) throws
+ IOException {
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ case BIT:
+ elementOffset = 0;
+ elementSize = 1;
+ break;
+ case INT2:
+ elementOffset = 0;
+ elementSize = 2;
+ break;
+ case INT4:
+ case INT8:
+ elementOffset = 0;
+ elementSize = WritableUtils.decodeVIntSize(bytes[offset]);
+ break;
+ case FLOAT4:
+ elementOffset = 0;
+ elementSize = 4;
+ break;
+ case FLOAT8:
+ elementOffset = 0;
+ elementSize = 8;
+ break;
+ case BLOB:
+ case PROTOBUF:
+ case INET4:
+ case CHAR:
+ case TEXT:
+ elementOffset = 1;
+ elementSize = bytes[offset];
+ break;
+ default:
+ elementOffset = 0;
+ elementSize = 0;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (reader != null) {
+ reader.sync(0);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null)
+ reader.close();
+
+ if (tableStats != null) {
+ tableStats.setReadBytes(totalBytes);
+ tableStats.setNumRows(currentIdx);
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
+}