You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:29 UTC
[15/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
new file mode 100644
index 0000000..cb86f35
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.BitArray;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class RawFile {
+ private static final Log LOG = LogFactory.getLog(RawFile.class);
+
+ public static class RawFileScanner extends FileScanner implements SeekableScanner {
+ private FileChannel channel;
+ private DataType[] columnTypes;
+
+ private ByteBuffer buffer;
+ private ByteBuf buf;
+ private Tuple tuple;
+
+ private int headerSize = 0; // Header size of a tuple
+ private BitArray nullFlags;
+ private static final int RECORD_SIZE = 4;
+ private boolean eos = false;
+ private long startOffset;
+ private long endOffset;
+ private FileInputStream fis;
+ private long recordCount;
+ private long totalReadBytes;
+ private long filePosition;
+ private boolean forceFillBuffer;
+
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
+ super(conf, schema, meta, fragment);
+ }
+
+ public void init() throws IOException {
+ File file;
+ try {
+ if (fragment.getPath().toUri().getScheme() != null) {
+ file = new File(fragment.getPath().toUri());
+ } else {
+ file = new File(fragment.getPath().toString());
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IOException(iae);
+ }
+ fis = new FileInputStream(file);
+ channel = fis.getChannel();
+ filePosition = startOffset = fragment.getStartKey();
+ endOffset = fragment.getStartKey() + fragment.getLength();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
+ + ", fragment length :" + fragment.getLength());
+ }
+
+ buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+ buffer = buf.nioBuffer(0, buf.capacity());
+
+ columnTypes = new DataType[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ tuple = new VTuple(columnTypes.length);
+ nullFlags = new BitArray(schema.size());
+ headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
+
+ // initial set position
+ if (fragment.getStartKey() > 0) {
+ channel.position(fragment.getStartKey());
+ }
+
+ forceFillBuffer = true;
+ super.init();
+ }
+
+ @Override
+ public long getNextOffset() throws IOException {
+ return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ eos = false;
+ filePosition = channel.position();
+
+ // do not fill the buffer if the offset is already included in the buffer.
+ if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){
+ buffer.position((int)(offset - (filePosition - buffer.limit())));
+ } else {
+ if(offset < startOffset || offset > startOffset + fragment.getLength()){
+ throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
+ startOffset, startOffset + fragment.getLength(), offset));
+ }
+ channel.position(offset);
+ filePosition = offset;
+ buffer.clear();
+ forceFillBuffer = true;
+ fillBuffer();
+ }
+ }
+
+ private boolean fillBuffer() throws IOException {
+ if(!forceFillBuffer) buffer.compact();
+
+ int bytesRead = channel.read(buffer);
+ forceFillBuffer = false;
+ if (bytesRead == -1) {
+ eos = true;
+ return false;
+ } else {
+ buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero
+ filePosition += bytesRead;
+ totalReadBytes += bytesRead;
+ return true;
+ }
+ }
+
+ /**
+ * Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers
+ * into values that can be efficiently encoded with varint. (Otherwise,
+ * negative values must be sign-extended to 64 bits to be varint encoded,
+ * thus always taking 10 bytes on the wire.)
+ *
+ * @param n An unsigned 32-bit integer, stored in a signed int because
+ * Java has no explicit unsigned support.
+ * @return A signed 32-bit integer.
+ */
+ public static int decodeZigZag32(final int n) {
+ return (n >>> 1) ^ -(n & 1);
+ }
+
+ /**
+ * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers
+ * into values that can be efficiently encoded with varint. (Otherwise,
+ * negative values must be sign-extended to 64 bits to be varint encoded,
+ * thus always taking 10 bytes on the wire.)
+ *
+ * @param n An unsigned 64-bit integer, stored in a signed int because
+ * Java has no explicit unsigned support.
+ * @return A signed 64-bit integer.
+ */
+ public static long decodeZigZag64(final long n) {
+ return (n >>> 1) ^ -(n & 1);
+ }
+
+
+ /**
+ * Read a raw Varint from the stream. If larger than 32 bits, discard the
+ * upper bits.
+ */
+ public int readRawVarint32() throws IOException {
+ byte tmp = buffer.get();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = buffer.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = buffer.get()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (buffer.get() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Invalid Variable int32");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /** Read a raw Varint from the stream. */
+ public long readRawVarint64() throws IOException {
+ int shift = 0;
+ long result = 0;
+ while (shift < 64) {
+ final byte b = buffer.get();
+ result |= (long)(b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ shift += 7;
+ }
+ throw new IOException("Invalid Variable int64");
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if(eos) return null;
+
+ if (forceFillBuffer || buffer.remaining() < headerSize) {
+ if (!fillBuffer()) {
+ return null;
+ }
+ }
+
+ // backup the buffer state
+ int bufferLimit = buffer.limit();
+ int recordSize = buffer.getInt();
+ int nullFlagSize = buffer.getShort();
+
+ buffer.limit(buffer.position() + nullFlagSize);
+ nullFlags.fromByteBuffer(buffer);
+ // restore the start of record contents
+ buffer.limit(bufferLimit);
+ if (buffer.remaining() < (recordSize - headerSize)) {
+
+ //if the buffer reaches the writable size, the buffer increase the record size
+ reSizeBuffer(recordSize);
+
+ if (!fillBuffer()) {
+ return null;
+ }
+ }
+
+ for (int i = 0; i < columnTypes.length; i++) {
+ // check if the i'th column is null
+ if (nullFlags.get(i)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ continue;
+ }
+
+ switch (columnTypes[i].getType()) {
+ case BOOLEAN :
+ tuple.put(i, DatumFactory.createBool(buffer.get()));
+ break;
+
+ case BIT :
+ tuple.put(i, DatumFactory.createBit(buffer.get()));
+ break;
+
+ case CHAR :
+ int realLen = readRawVarint32();
+ byte[] buf = new byte[realLen];
+ buffer.get(buf);
+ tuple.put(i, DatumFactory.createChar(buf));
+ break;
+
+ case INT2 :
+ tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+ break;
+
+ case INT4 :
+ tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
+ break;
+
+ case INT8 :
+ tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
+ break;
+
+ case FLOAT4 :
+ tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+ break;
+
+ case FLOAT8 :
+ tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+ break;
+
+ case TEXT : {
+ int len = readRawVarint32();
+ byte [] strBytes = new byte[len];
+ buffer.get(strBytes);
+ tuple.put(i, DatumFactory.createText(strBytes));
+ break;
+ }
+
+ case BLOB : {
+ int len = readRawVarint32();
+ byte [] rawBytes = new byte[len];
+ buffer.get(rawBytes);
+ tuple.put(i, DatumFactory.createBlob(rawBytes));
+ break;
+ }
+
+ case PROTOBUF: {
+ int len = readRawVarint32();
+ byte [] rawBytes = new byte[len];
+ buffer.get(rawBytes);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(rawBytes);
+ tuple.put(i, factory.createDatum(builder.build()));
+ break;
+ }
+
+ case INET4 :
+ byte [] ipv4Bytes = new byte[4];
+ buffer.get(ipv4Bytes);
+ tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
+ break;
+
+ case DATE: {
+ int val = buffer.getInt();
+ if (val < Integer.MIN_VALUE + 1) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
+ }
+ break;
+ }
+ case TIME:
+ case TIMESTAMP: {
+ long val = buffer.getLong();
+ if (val < Long.MIN_VALUE + 1) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
+ }
+ break;
+ }
+ case NULL_TYPE:
+ tuple.put(i, NullDatum.get());
+ break;
+
+ default:
+ }
+ }
+
+ recordCount++;
+
+ if(filePosition - buffer.remaining() >= endOffset){
+ eos = true;
+ }
+ return new VTuple(tuple);
+ }
+
+ private void reSizeBuffer(int writableBytes){
+ if (buffer.capacity() - buffer.remaining() < writableBytes) {
+ buf.setIndex(buffer.position(), buffer.limit());
+ buf.markReaderIndex();
+ buf.discardSomeReadBytes();
+ buf.ensureWritable(writableBytes);
+ buffer = buf.nioBuffer(0, buf.capacity());
+ buffer.limit(buf.writerIndex());
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ // reset the buffer
+ buffer.clear();
+ forceFillBuffer = true;
+ filePosition = fragment.getStartKey();
+ channel.position(filePosition);
+ eos = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(buf != null){
+ buffer.clear();
+ buffer = null;
+
+ buf.release();
+ buf = null;
+ }
+
+ IOUtils.cleanup(LOG, channel, fis);
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if(tableStats != null){
+ tableStats.setNumRows(recordCount);
+ tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
+ tableStats.setNumBytes(fragment.getLength());
+ }
+ return tableStats;
+ }
+
+ @Override
+ public float getProgress() {
+ if(eos) {
+ return 1.0f;
+ }
+
+ if (filePosition - startOffset == 0) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, ((float) filePosition / endOffset));
+ }
+ }
+ }
+
+ public static class RawFileAppender extends FileAppender {
+ private FileChannel channel;
+ private RandomAccessFile randomAccessFile;
+ private DataType[] columnTypes;
+
+ private ByteBuffer buffer;
+ private ByteBuf buf;
+ private BitArray nullFlags;
+ private int headerSize = 0;
+ private static final int RECORD_SIZE = 4;
+ private long pos;
+
+ private TableStatistics stats;
+
+ public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ }
+
+ public void init() throws IOException {
+ File file;
+ try {
+ if (path.toUri().getScheme() != null) {
+ file = new File(path.toUri());
+ } else {
+ file = new File(path.toString());
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IOException(iae);
+ }
+
+ randomAccessFile = new RandomAccessFile(file, "rw");
+ channel = randomAccessFile.getChannel();
+ pos = 0;
+
+ columnTypes = new DataType[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+ buffer = buf.nioBuffer(0, buf.capacity());
+
+ // comput the number of bytes, representing the null flags
+
+ nullFlags = new BitArray(schema.size());
+ headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ super.init();
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ private void flushBuffer() throws IOException {
+ buffer.flip();
+ channel.write(buffer);
+ buffer.clear();
+ }
+
+ private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
+ throws IOException {
+
+ // if the buffer reaches the limit,
+ // write the bytes from 0 to the previous record.
+ if (buffer.remaining() < sizeToBeWritten) {
+
+ int limit = buffer.position();
+ buffer.limit(recordOffset);
+ buffer.flip();
+ channel.write(buffer);
+ buffer.position(recordOffset);
+ buffer.limit(limit);
+ buffer.compact();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers
+ * into values that can be efficiently encoded with varint. (Otherwise,
+ * negative values must be sign-extended to 64 bits to be varint encoded,
+ * thus always taking 10 bytes on the wire.)
+ *
+ * @param n A signed 32-bit integer.
+ * @return An unsigned 32-bit integer, stored in a signed int because
+ * Java has no explicit unsigned support.
+ */
+ public static int encodeZigZag32(final int n) {
+ // Note: the right-shift must be arithmetic
+ return (n << 1) ^ (n >> 31);
+ }
+
+ /**
+ * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers
+ * into values that can be efficiently encoded with varint. (Otherwise,
+ * negative values must be sign-extended to 64 bits to be varint encoded,
+ * thus always taking 10 bytes on the wire.)
+ *
+ * @param n A signed 64-bit integer.
+ * @return An unsigned 64-bit integer, stored in a signed int because
+ * Java has no explicit unsigned support.
+ */
+ public static long encodeZigZag64(final long n) {
+ // Note: the right-shift must be arithmetic
+ return (n << 1) ^ (n >> 63);
+ }
+
+ /**
+ * Encode and write a varint. {@code value} is treated as
+ * unsigned, so it won't be sign-extended if negative.
+ */
+ public void writeRawVarint32(int value) throws IOException {
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ buffer.put((byte) value);
+ return;
+ } else {
+ buffer.put((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ }
+ }
+
+ /**
+ * Compute the number of bytes that would be needed to encode a varint.
+ * {@code value} is treated as unsigned, so it won't be sign-extended if
+ * negative.
+ */
+ public static int computeRawVarint32Size(final int value) {
+ if ((value & (0xffffffff << 7)) == 0) return 1;
+ if ((value & (0xffffffff << 14)) == 0) return 2;
+ if ((value & (0xffffffff << 21)) == 0) return 3;
+ if ((value & (0xffffffff << 28)) == 0) return 4;
+ return 5;
+ }
+
+ /** Encode and write a varint. */
+ public void writeRawVarint64(long value) throws IOException {
+ while (true) {
+ if ((value & ~0x7FL) == 0) {
+ buffer.put((byte) value);
+ return;
+ } else {
+ buffer.put((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ }
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+
+ if (buffer.remaining() < headerSize) {
+ flushBuffer();
+ }
+
+ // skip the row header
+ int recordOffset = buffer.position();
+ buffer.position(recordOffset + headerSize);
+ // reset the null flags
+ nullFlags.clear();
+ for (int i = 0; i < schema.size(); i++) {
+ if (enabledStats) {
+ stats.analyzeField(i, t.get(i));
+ }
+
+ if (t.isNull(i)) {
+ nullFlags.set(i);
+ continue;
+ }
+
+ // 8 is the maximum bytes size of all types
+ if (flushBufferAndReplace(recordOffset, 8)) {
+ recordOffset = 0;
+ }
+
+ switch(columnTypes[i].getType()) {
+ case NULL_TYPE:
+ nullFlags.set(i);
+ continue;
+
+ case BOOLEAN:
+ case BIT:
+ buffer.put(t.getByte(i));
+ break;
+
+ case INT2 :
+ buffer.putShort(t.getInt2(i));
+ break;
+
+ case INT4 :
+ writeRawVarint32(encodeZigZag32(t.getInt4(i)));
+ break;
+
+ case INT8 :
+ writeRawVarint64(encodeZigZag64(t.getInt8(i)));
+ break;
+
+ case FLOAT4 :
+ buffer.putFloat(t.getFloat4(i));
+ break;
+
+ case FLOAT8 :
+ buffer.putDouble(t.getFloat8(i));
+ break;
+
+ case CHAR:
+ case TEXT: {
+ byte [] strBytes = t.getBytes(i);
+ if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) {
+ recordOffset = 0;
+ }
+ writeRawVarint32(strBytes.length);
+ buffer.put(strBytes);
+ break;
+ }
+
+ case DATE:
+ buffer.putInt(t.getInt4(i));
+ break;
+
+ case TIME:
+ case TIMESTAMP:
+ buffer.putLong(t.getInt8(i));
+ break;
+
+ case BLOB : {
+ byte [] rawBytes = t.getBytes(i);
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
+ recordOffset = 0;
+ }
+ writeRawVarint32(rawBytes.length);
+ buffer.put(rawBytes);
+ break;
+ }
+
+ case PROTOBUF: {
+ byte [] rawBytes = t.getBytes(i);
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
+ recordOffset = 0;
+ }
+ writeRawVarint32(rawBytes.length);
+ buffer.put(rawBytes);
+ break;
+ }
+
+ case INET4 :
+ buffer.put(t.getBytes(i));
+ break;
+
+ default:
+ throw new IOException("Cannot support data type: " + columnTypes[i].getType());
+ }
+ }
+
+ // write a record header
+ int bufferPos = buffer.position();
+ buffer.position(recordOffset);
+ buffer.putInt(bufferPos - recordOffset);
+ byte [] flags = nullFlags.toArray();
+ buffer.putShort((short) flags.length);
+ buffer.put(flags);
+
+ pos += bufferPos - recordOffset;
+ buffer.position(bufferPos);
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if(buffer != null){
+ flushBuffer();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+ }
+
+ if(buf != null){
+ buffer.clear();
+ buffer = null;
+
+ buf.release();
+ buf = null;
+ }
+
+ IOUtils.cleanup(LOG, channel, randomAccessFile);
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ stats.setNumBytes(pos);
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
new file mode 100644
index 0000000..8da6ada
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.BitArray;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+public class RowFile {
+ public static final Log LOG = LogFactory.getLog(RowFile.class);
+
+ private static final int SYNC_ESCAPE = -1;
+ private static final int SYNC_HASH_SIZE = 16;
+ private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
+ private final static int DEFAULT_BUFFER_SIZE = 65535;
+ public static int SYNC_INTERVAL;
+
+ public static class RowFileScanner extends FileScanner {
+ private FileSystem fs;
+ private FSDataInputStream in;
+ private Tuple tuple;
+
+ private byte[] sync = new byte[SYNC_HASH_SIZE];
+ private byte[] checkSync = new byte[SYNC_HASH_SIZE];
+ private long start, end;
+
+ private ByteBuffer buffer;
+ private final int tupleHeaderSize;
+ private BitArray nullFlags;
+ private long bufferStartPos;
+
+ public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
+
+ SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
+ ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
+
+ nullFlags = new BitArray(schema.size());
+ tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
+ this.start = this.fragment.getStartKey();
+ this.end = this.start + this.fragment.getLength();
+ }
+
+ public void init() throws IOException {
+ // set default page size.
+ fs = fragment.getPath().getFileSystem(conf);
+ in = fs.open(fragment.getPath());
+ buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
+ buffer.flip();
+
+ readHeader();
+
+ // find the correct position from the start
+ if (this.start > in.getPos()) {
+ long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
+ in.seek(realStart);
+ }
+ bufferStartPos = in.getPos();
+ fillBuffer();
+
+ if (start != 0) {
+ // TODO: improve
+ boolean syncFound = false;
+ while (!syncFound) {
+ if (buffer.remaining() < SYNC_SIZE) {
+ fillBuffer();
+ }
+ buffer.mark();
+ syncFound = checkSync();
+ if (!syncFound) {
+ buffer.reset();
+ buffer.get(); // proceed one byte
+ }
+ }
+ bufferStartPos += buffer.position();
+ buffer.compact();
+ buffer.flip();
+ }
+
+ super.init();
+ }
+
+ private void readHeader() throws IOException {
+ SYNC_INTERVAL = in.readInt();
+ StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
+ }
+
+ /**
+ * Find the sync from the front of the buffer
+ *
+ * @return return true if it succeeds to find the sync.
+ * @throws java.io.IOException
+ */
+ private boolean checkSync() throws IOException {
+ buffer.getInt(); // escape
+ buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
+ return Arrays.equals(checkSync, sync);
+ }
+
+ private int fillBuffer() throws IOException {
+ bufferStartPos += buffer.position();
+ buffer.compact();
+ int remain = buffer.remaining();
+ int read = in.read(buffer);
+ if (read == -1) {
+ buffer.flip();
+ return read;
+ } else {
+ int totalRead = read;
+ if (remain > totalRead) {
+ read = in.read(buffer);
+ totalRead += read > 0 ? read : 0;
+ }
+ buffer.flip();
+ return totalRead;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ while (buffer.remaining() < SYNC_SIZE) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ buffer.mark();
+ if (!checkSync()) {
+ buffer.reset();
+ } else {
+ if (bufferStartPos + buffer.position() > end) {
+ return null;
+ }
+ }
+
+ while (buffer.remaining() < tupleHeaderSize) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ int i;
+ tuple = new VTuple(schema.size());
+
+ int nullFlagSize = buffer.getShort();
+ byte[] nullFlagBytes = new byte[nullFlagSize];
+ buffer.get(nullFlagBytes, 0, nullFlagSize);
+ nullFlags = new BitArray(nullFlagBytes);
+ int tupleSize = buffer.getShort();
+
+ while (buffer.remaining() < (tupleSize)) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ Datum datum;
+ Column col;
+ for (i = 0; i < schema.size(); i++) {
+ if (!nullFlags.get(i)) {
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN :
+ datum = DatumFactory.createBool(buffer.get());
+ tuple.put(i, datum);
+ break;
+
+ case BIT:
+ datum = DatumFactory.createBit(buffer.get());
+ tuple.put(i, datum );
+ break;
+
+ case CHAR :
+ int realLen = buffer.getInt();
+ byte[] buf = new byte[col.getDataType().getLength()];
+ buffer.get(buf);
+ byte[] charBuf = Arrays.copyOf(buf, realLen);
+ tuple.put(i, DatumFactory.createChar(charBuf));
+ break;
+
+ case INT2 :
+ datum = DatumFactory.createInt2(buffer.getShort());
+ tuple.put(i, datum );
+ break;
+
+ case INT4 :
+ datum = DatumFactory.createInt4(buffer.getInt());
+ tuple.put(i, datum );
+ break;
+
+ case INT8 :
+ datum = DatumFactory.createInt8(buffer.getLong());
+ tuple.put(i, datum );
+ break;
+
+ case FLOAT4 :
+ datum = DatumFactory.createFloat4(buffer.getFloat());
+ tuple.put(i, datum);
+ break;
+
+ case FLOAT8 :
+ datum = DatumFactory.createFloat8(buffer.getDouble());
+ tuple.put(i, datum);
+ break;
+
+ case TEXT:
+ short bytelen = buffer.getShort();
+ byte[] strbytes = new byte[bytelen];
+ buffer.get(strbytes, 0, bytelen);
+ datum = DatumFactory.createText(strbytes);
+ tuple.put(i, datum);
+ break;
+
+ case BLOB:
+ short bytesLen = buffer.getShort();
+ byte [] bytesBuf = new byte[bytesLen];
+ buffer.get(bytesBuf);
+ datum = DatumFactory.createBlob(bytesBuf);
+ tuple.put(i, datum);
+ break;
+
+ case INET4 :
+ byte[] ipv4 = new byte[4];
+ buffer.get(ipv4, 0, 4);
+ datum = DatumFactory.createInet4(ipv4);
+ tuple.put(i, datum);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }
+ }
+ return tuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
+ }
+
+ public static class RowFileAppender extends FileAppender {
+ private FSDataOutputStream out;
+ private long lastSyncPos;
+ private FileSystem fs;
+ private byte[] sync;
+ private ByteBuffer buffer;
+
+ private BitArray nullFlags;
+ // statistics
+ private TableStatistics stats;
+
+ public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ final Schema schema, final TableMeta meta, final Path workDir)
+ throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ }
+
+ public void init() throws IOException {
+ SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
+ ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
+ fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+
+ sync = new byte[SYNC_HASH_SIZE];
+ lastSyncPos = 0;
+
+ out = fs.create(path);
+
+ MessageDigest md;
+ try {
+ md = MessageDigest.getInstance("MD5");
+ md.update((path.toString()+System.currentTimeMillis()).getBytes());
+ sync = md.digest();
+ } catch (NoSuchAlgorithmException e) {
+ LOG.error(e);
+ }
+
+ writeHeader();
+
+ buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+
+ nullFlags = new BitArray(schema.size());
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+ }
+
+ private void writeHeader() throws IOException {
+ out.writeInt(SYNC_INTERVAL);
+ out.write(sync);
+ out.flush();
+ lastSyncPos = out.getPos();
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ checkAndWriteSync();
+ Column col;
+
+ buffer.clear();
+ nullFlags.clear();
+
+ for (int i = 0; i < schema.size(); i++) {
+ if (enabledStats) {
+ stats.analyzeField(i, t.get(i));
+ }
+
+ if (t.isNull(i)) {
+ nullFlags.set(i);
+ } else {
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ buffer.put(t.get(i).asByte());
+ break;
+ case BIT:
+ buffer.put(t.get(i).asByte());
+ break;
+ case CHAR:
+ byte[] src = t.get(i).asByteArray();
+ byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
+ buffer.putInt(src.length);
+ buffer.put(dst);
+ break;
+ case TEXT:
+ byte [] strbytes = t.get(i).asByteArray();
+ buffer.putShort((short)strbytes.length);
+ buffer.put(strbytes, 0, strbytes.length);
+ break;
+ case INT2:
+ buffer.putShort(t.get(i).asInt2());
+ break;
+ case INT4:
+ buffer.putInt(t.get(i).asInt4());
+ break;
+ case INT8:
+ buffer.putLong(t.get(i).asInt8());
+ break;
+ case FLOAT4:
+ buffer.putFloat(t.get(i).asFloat4());
+ break;
+ case FLOAT8:
+ buffer.putDouble(t.get(i).asFloat8());
+ break;
+ case BLOB:
+ byte [] bytes = t.get(i).asByteArray();
+ buffer.putShort((short)bytes.length);
+ buffer.put(bytes);
+ break;
+ case INET4:
+ buffer.put(t.get(i).asByteArray());
+ break;
+ case INET6:
+ buffer.put(t.get(i).asByteArray());
+ break;
+ case NULL_TYPE:
+ nullFlags.set(i);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ byte[] bytes = nullFlags.toArray();
+ out.writeShort(bytes.length);
+ out.write(bytes);
+
+ bytes = buffer.array();
+ int dataLen = buffer.position();
+ out.writeShort(dataLen);
+ out.write(bytes, 0, dataLen);
+
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (out != null) {
+ if (enabledStats) {
+ stats.setNumBytes(out.getPos());
+ }
+ sync();
+ out.flush();
+ out.close();
+ }
+ }
+
+ private void sync() throws IOException {
+ if (lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE);
+ out.write(sync);
+ lastSyncPos = out.getPos();
+ }
+ }
+
+ private void checkAndWriteSync() throws IOException {
+ if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+ sync();
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+ public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+ super(in, recordDelimiterBytes);
+ }
+
+ public SplitLineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ }
+
+ public boolean needAdditionalRecordAfterSplit() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
new file mode 100644
index 0000000..dbb8bd0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * FileAppender for writing to Avro files.
+ */
+public class AvroAppender extends FileAppender {
+ private TableStatistics stats;
+ private Schema avroSchema;
+ private List<Schema.Field> avroFields;
+ private DataFileWriter<GenericRecord> dataFileWriter;
+
+ /**
+ * Creates a new AvroAppender.
+ *
+ * @param conf Configuration properties.
+ * @param taskAttemptId The task attempt id
+ * @param schema The table schema.
+ * @param meta The table metadata.
+ * @param workDir The path of the Parquet file to write to.
+ */
+ public AvroAppender(Configuration conf,
+ QueryUnitAttemptId taskAttemptId,
+ org.apache.tajo.catalog.Schema schema,
+ TableMeta meta, Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ }
+
+ /**
+ * Initializes the Appender.
+ */
+ public void init() throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+ FSDataOutputStream outputStream = fs.create(path);
+
+ avroSchema = AvroUtil.getAvroSchema(meta, conf);
+ avroFields = avroSchema.getFields();
+
+ DatumWriter<GenericRecord> datumWriter =
+ new GenericDatumWriter<GenericRecord>(avroSchema);
+ dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+ dataFileWriter.create(avroSchema, outputStream);
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(schema);
+ }
+ super.init();
+ }
+
+ /**
+ * Gets the current offset. Tracking offsets is currenly not implemented, so
+ * this method always returns 0.
+ *
+ * @return 0
+ */
+ @Override
+ public long getOffset() throws IOException {
+ return 0;
+ }
+
+ private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
+ if (tuple.get(i) instanceof NullDatum) {
+ return null;
+ }
+ switch (avroType) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ return tuple.getBool(i);
+ case INT:
+ return tuple.getInt4(i);
+ case LONG:
+ return tuple.getInt8(i);
+ case FLOAT:
+ return tuple.getFloat4(i);
+ case DOUBLE:
+ return tuple.getFloat8(i);
+ case BYTES:
+ case FIXED:
+ return ByteBuffer.wrap(tuple.getBytes(i));
+ case STRING:
+ return tuple.getText(i);
+ default:
+ throw new RuntimeException("Unknown primitive type.");
+ }
+ }
+
+ /**
+ * Write a Tuple to the Avro file.
+ *
+ * @param tuple The Tuple to write.
+ */
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ for (int i = 0; i < schema.size(); ++i) {
+ Column column = schema.getColumn(i);
+ if (enabledStats) {
+ stats.analyzeField(i, tuple.get(i));
+ }
+ Object value;
+ Schema.Field avroField = avroFields.get(i);
+ Schema.Type avroType = avroField.schema().getType();
+ switch (avroType) {
+ case NULL:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BYTES:
+ case STRING:
+ case FIXED:
+ value = getPrimitive(tuple, i, avroType);
+ break;
+ case RECORD:
+ throw new RuntimeException("Avro RECORD not supported.");
+ case ENUM:
+ throw new RuntimeException("Avro ENUM not supported.");
+ case MAP:
+ throw new RuntimeException("Avro MAP not supported.");
+ case UNION:
+ List<Schema> schemas = avroField.schema().getTypes();
+ if (schemas.size() != 2) {
+ throw new RuntimeException("Avro UNION not supported.");
+ }
+ if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+ value = getPrimitive(tuple, i, schemas.get(1).getType());
+ } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+ value = getPrimitive(tuple, i, schemas.get(0).getType());
+ } else {
+ throw new RuntimeException("Avro UNION not supported.");
+ }
+ break;
+ default:
+ throw new RuntimeException("Unknown type: " + avroType);
+ }
+ record.put(i, value);
+ }
+ dataFileWriter.append(record);
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ /**
+ * Flushes the current state of the file.
+ */
+ @Override
+ public void flush() throws IOException {
+ dataFileWriter.flush();
+ }
+
+ /**
+ * Closes the Appender.
+ */
+ @Override
+ public void close() throws IOException {
+ dataFileWriter.close();
+ }
+
+ /**
+ * If table statistics is enabled, retrieve the table statistics.
+ *
+ * @return Table statistics if enabled or null otherwise.
+ */
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
new file mode 100644
index 0000000..51594df
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * FileScanner for reading Avro files
+ */
+public class AvroScanner extends FileScanner {
+ private Schema avroSchema;
+ private List<Schema.Field> avroFields;
+ private DataFileReader<GenericRecord> dataFileReader;
+ private int[] projectionMap;
+
+ /**
+ * Creates a new AvroScanner.
+ *
+ * @param conf
+ * @param schema
+ * @param meta
+ * @param fragment
+ */
+ public AvroScanner(Configuration conf,
+ final org.apache.tajo.catalog.Schema schema,
+ final TableMeta meta, final Fragment fragment) {
+ super(conf, schema, meta, fragment);
+ }
+
+ /**
+ * Initializes the AvroScanner.
+ */
+ @Override
+ public void init() throws IOException {
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+ prepareProjection(targets);
+
+ avroSchema = AvroUtil.getAvroSchema(meta, conf);
+ avroFields = avroSchema.getFields();
+
+ DatumReader<GenericRecord> datumReader =
+ new GenericDatumReader<GenericRecord>(avroSchema);
+ SeekableInput input = new FsInput(fragment.getPath(), conf);
+ dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
+ super.init();
+ }
+
+ private void prepareProjection(Column[] targets) {
+ projectionMap = new int[targets.length];
+ for (int i = 0; i < targets.length; ++i) {
+ projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
+ }
+ }
+
+ private static String fromAvroString(Object value) {
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8)value;
+ return utf8.toString();
+ }
+ return value.toString();
+ }
+
+ private static Schema getNonNull(Schema schema) {
+ if (!schema.getType().equals(Schema.Type.UNION)) {
+ return schema;
+ }
+ List<Schema> schemas = schema.getTypes();
+ if (schemas.size() != 2) {
+ return schema;
+ }
+ if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+ return schemas.get(1);
+ } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+ return schemas.get(0);
+ } else {
+ return schema;
+ }
+ }
+
+ private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
+ int intValue = (Integer)value;
+ switch (tajoType) {
+ case BIT:
+ return DatumFactory.createBit((byte)(intValue & 0xff));
+ case INT2:
+ return DatumFactory.createInt2((short)intValue);
+ default:
+ return DatumFactory.createInt4(intValue);
+ }
+ }
+
+ private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
+ DataType dataType) {
+ ByteBuffer buffer = (ByteBuffer)value;
+ byte[] bytes = new byte[buffer.capacity()];
+ buffer.get(bytes, 0, bytes.length);
+ switch (tajoType) {
+ case INET4:
+ return DatumFactory.createInet4(bytes);
+ case PROTOBUF:
+ try {
+ ProtobufDatumFactory factory =
+ ProtobufDatumFactory.get(dataType.getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(bytes);
+ return factory.createDatum(builder);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ default:
+ return new BlobDatum(bytes);
+ }
+ }
+
+ private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
+ switch (tajoType) {
+ case CHAR:
+ return DatumFactory.createChar(fromAvroString(value));
+ default:
+ return DatumFactory.createText(fromAvroString(value));
+ }
+ }
+
+ /**
+ * Reads the next Tuple from the Avro file.
+ *
+ * @return The next Tuple from the Avro file or null if end of file is
+ * reached.
+ */
+ @Override
+ public Tuple next() throws IOException {
+ if (!dataFileReader.hasNext()) {
+ return null;
+ }
+
+ Tuple tuple = new VTuple(schema.size());
+ GenericRecord record = dataFileReader.next();
+ for (int i = 0; i < projectionMap.length; ++i) {
+ int columnIndex = projectionMap[i];
+ Object value = record.get(columnIndex);
+ if (value == null) {
+ tuple.put(columnIndex, NullDatum.get());
+ continue;
+ }
+
+ // Get Avro type.
+ Schema.Field avroField = avroFields.get(columnIndex);
+ Schema nonNullAvroSchema = getNonNull(avroField.schema());
+ Schema.Type avroType = nonNullAvroSchema.getType();
+
+ // Get Tajo type.
+ Column column = schema.getColumn(columnIndex);
+ DataType dataType = column.getDataType();
+ TajoDataTypes.Type tajoType = dataType.getType();
+ switch (avroType) {
+ case NULL:
+ tuple.put(columnIndex, NullDatum.get());
+ break;
+ case BOOLEAN:
+ tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
+ break;
+ case INT:
+ tuple.put(columnIndex, convertInt(value, tajoType));
+ break;
+ case LONG:
+ tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
+ break;
+ case FLOAT:
+ tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
+ break;
+ case DOUBLE:
+ tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
+ break;
+ case BYTES:
+ tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
+ break;
+ case STRING:
+ tuple.put(columnIndex, convertString(value, tajoType));
+ break;
+ case RECORD:
+ throw new RuntimeException("Avro RECORD not supported.");
+ case ENUM:
+ throw new RuntimeException("Avro ENUM not supported.");
+ case MAP:
+ throw new RuntimeException("Avro MAP not supported.");
+ case UNION:
+ throw new RuntimeException("Avro UNION not supported.");
+ case FIXED:
+ tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
+ break;
+ default:
+ throw new RuntimeException("Unknown type.");
+ }
+ }
+ return tuple;
+ }
+
+ /**
+ * Resets the scanner
+ */
+ @Override
+ public void reset() throws IOException {
+ }
+
+ /**
+ * Closes the scanner.
+ */
+ @Override
+ public void close() throws IOException {
+ if (dataFileReader != null) {
+ dataFileReader.close();
+ }
+ }
+
+ /**
+ * Returns whether this scanner is projectable.
+ *
+ * @return true
+ */
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ /**
+ * Returns whether this scanner is selectable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ /**
+ * Returns whether this scanner is splittable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
new file mode 100644
index 0000000..0d14c3d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+public class AvroUtil {
+ public static Schema getAvroSchema(TableMeta meta, Configuration conf)
+ throws IOException {
+
+ boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
+ boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
+ if (!isSchemaLiteral && !isSchemaUrl) {
+ throw new RuntimeException("No Avro schema for table.");
+ }
+ if (isSchemaLiteral) {
+ String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
+ return new Schema.Parser().parse(schema);
+ }
+
+ String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
+ if (schemaURL.toLowerCase().startsWith("http")) {
+ return getAvroSchemaFromHttp(schemaURL);
+ } else {
+ return getAvroSchemaFromFileSystem(schemaURL, conf);
+ }
+ }
+
+ public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
+ InputStream inputStream = new URL(schemaURL).openStream();
+
+ try {
+ return new Schema.Parser().parse(inputStream);
+ } finally {
+ IOUtils.closeStream(inputStream);
+ }
+ }
+
+ public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException {
+ Path schemaPath = new Path(schemaURL);
+ FileSystem fs = schemaPath.getFileSystem(conf);
+ FSDataInputStream inputStream = fs.open(schemaPath);
+
+ try {
+ return new Schema.Parser().parse(inputStream);
+ } finally {
+ IOUtils.closeStream(inputStream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
new file mode 100644
index 0000000..40d1545
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * Provides read and write support for Avro files. Avro schemas are
+ * converted to Tajo schemas according to the following mapping of Avro
+ * and Tajo types:
+ * </p>
+ *
+ * <table>
+ * <tr>
+ * <th>Avro type</th>
+ * <th>Tajo type</th>
+ * </tr>
+ * <tr>
+ * <td>NULL</td>
+ * <td>NULL_TYPE</td>
+ * </tr>
+ * <tr>
+ * <td>BOOLEAN</td>
+ * <td>BOOLEAN</td>
+ * </tr>
+ * <tr>
+ * <td>INT</td>
+ * <td>INT4</td>
+ * </tr>
+ * <tr>
+ * <td>LONG</td>
+ * <td>INT8</td>
+ * </tr>
+ * <tr>
+ * <td>FLOAT</td>
+ * <td>FLOAT4</td>
+ * </tr>
+ * <tr>
+ * <td>DOUBLE</td>
+ * <td>FLOAT8</td>
+ * </tr>
+ * <tr>
+ * <td>BYTES</td>
+ * <td>BLOB</td>
+ * </tr>
+ * <tr>
+ * <td>STRING</td>
+ * <td>TEXT</td>
+ * </tr>
+ * <tr>
+ * <td>FIXED</td>
+ * <td>BLOB</td>
+ * </tr>
+ * <tr>
+ * <td>RECORD</td>
+ * <td>Not currently supported</td>
+ * </tr>
+ * <tr>
+ * <td>ENUM</td>
+ * <td>Not currently supported.</td>
+ * </tr>
+ * <tr>
+ * <td>MAP</td>
+ * <td>Not currently supported.</td>
+ * </tr>
+ * <tr>
+ * <td>UNION</td>
+ * <td>Not currently supported.</td>
+ * </tr>
+ * </table>
+ */
+
+package org.apache.tajo.storage.avro;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
new file mode 100644
index 0000000..ac413ca
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.storage.StorageFragmentProtos.*;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
+ @Expose private String tableName; // required
+ @Expose private Path uri; // required
+ @Expose public Long startOffset; // required
+ @Expose public Long length; // required
+
+ private String[] hosts; // Datanode hostnames
+ @Expose private int[] diskIds;
+
+ public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
+ FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+ builder.mergeFrom(raw);
+ builder.build();
+ init(builder.build());
+ }
+
+ public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
+ throws IOException {
+ this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
+ }
+
+ public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
+ this.set(tableName, uri, start, length, hosts, diskIds);
+ }
+ // Non splittable
+ public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
+ this.set(tableName, uri, start, length, hosts, null);
+ }
+
+ public FileFragment(String fragmentId, Path path, long start, long length) {
+ this.set(fragmentId, path, start, length, null, null);
+ }
+
+ public FileFragment(FileFragmentProto proto) {
+ init(proto);
+ }
+
+ private void init(FileFragmentProto proto) {
+ int[] diskIds = new int[proto.getDiskIdsList().size()];
+ int i = 0;
+ for(Integer eachValue: proto.getDiskIdsList()) {
+ diskIds[i++] = eachValue;
+ }
+ this.set(proto.getId(), new Path(proto.getPath()),
+ proto.getStartOffset(), proto.getLength(),
+ proto.getHostsList().toArray(new String[]{}),
+ diskIds);
+ }
+
+ private void set(String tableName, Path path, long start,
+ long length, String[] hosts, int[] diskIds) {
+ this.tableName = tableName;
+ this.uri = path;
+ this.startOffset = start;
+ this.length = length;
+ this.hosts = hosts;
+ this.diskIds = diskIds;
+ }
+
+
+ /**
+ * Get the list of hosts (hostname) hosting this block
+ */
+ public String[] getHosts() {
+ if (hosts == null) {
+ this.hosts = new String[0];
+ }
+ return hosts;
+ }
+
+ /**
+ * Get the list of Disk Ids
+ * Unknown disk is -1. Others 0 ~ N
+ */
+ public int[] getDiskIds() {
+ if (diskIds == null) {
+ this.diskIds = new int[getHosts().length];
+ Arrays.fill(this.diskIds, -1);
+ }
+ return diskIds;
+ }
+
+ public void setDiskIds(int[] diskIds){
+ this.diskIds = diskIds;
+ }
+
+ @Override
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public Path getPath() {
+ return this.uri;
+ }
+
+ public void setPath(Path path) {
+ this.uri = path;
+ }
+
+ public Long getStartKey() {
+ return this.startOffset;
+ }
+
+ @Override
+ public String getKey() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public long getLength() {
+ return this.length;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.length <= 0;
+ }
+ /**
+ *
+ * The offset range of tablets <b>MUST NOT</b> be overlapped.
+ *
+ * @param t
+ * @return If the table paths are not same, return -1.
+ */
+ @Override
+ public int compareTo(FileFragment t) {
+ if (getPath().equals(t.getPath())) {
+ long diff = this.getStartKey() - t.getStartKey();
+ if (diff < 0) {
+ return -1;
+ } else if (diff > 0) {
+ return 1;
+ } else {
+ return 0;
+ }
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FileFragment) {
+ FileFragment t = (FileFragment) o;
+ if (getPath().equals(t.getPath())
+ && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
+ && TUtil.checkEquals(t.getLength(), this.getLength())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableName, uri, startOffset, length);
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ FileFragment frag = (FileFragment) super.clone();
+ frag.tableName = tableName;
+ frag.uri = uri;
+ frag.diskIds = diskIds;
+ frag.hosts = hosts;
+
+ return frag;
+ }
+
+ @Override
+ public String toString() {
+ return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+ +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
+ + getLength() + "}" ;
+ }
+
+ public FragmentProto getProto() {
+ FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+ builder.setId(this.tableName);
+ builder.setStartOffset(this.startOffset);
+ builder.setLength(this.length);
+ builder.setPath(this.uri.toString());
+ if(diskIds != null) {
+ List<Integer> idList = new ArrayList<Integer>();
+ for(int eachId: diskIds) {
+ idList.add(eachId);
+ }
+ builder.addAllDiskIds(idList);
+ }
+
+ if(hosts != null) {
+ builder.addAllHosts(TUtil.newList(hosts));
+ }
+
+ FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+ fragmentBuilder.setId(this.tableName);
+ fragmentBuilder.setStoreType(StoreType.CSV.name());
+ fragmentBuilder.setContents(builder.buildPartial().toByteString());
+ return fragmentBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
new file mode 100644
index 0000000..a6af19b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.io.IOException;
+
+public interface IndexMethod {
+ IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException;
+ IndexReader getIndexReader(final Path fileName, Schema keySchema,
+ TupleComparator comparator) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
new file mode 100644
index 0000000..3ae5c9d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface IndexReader {
+
+ /**
+ * Find the offset corresponding to key which is equal to a given key.
+ *
+ * @param key
+ * @return
+ * @throws java.io.IOException
+ */
+ public long find(Tuple key) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
new file mode 100644
index 0000000..04738f8
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public abstract class IndexWriter {
+
+ public abstract void write(Tuple key, long offset) throws IOException;
+
+ public abstract void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
new file mode 100644
index 0000000..0c07b4a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface OrderIndexReader extends IndexReader {
+ /**
+ * Find the offset corresponding to key which is equal to or greater than
+ * a given key.
+ *
+ * @param key to find
+ * @return
+ * @throws java.io.IOException
+ */
+ public long find(Tuple key, boolean nextKey) throws IOException;
+
+ /**
+ * Return the next offset from the latest find or next offset
+ * @return
+ * @throws java.io.IOException
+ */
+ public long next() throws IOException;
+}