You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:29 UTC

[15/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
new file mode 100644
index 0000000..cb86f35
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.BitArray;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class RawFile {
+  private static final Log LOG = LogFactory.getLog(RawFile.class);
+
+  public static class RawFileScanner extends FileScanner implements SeekableScanner {
+    private FileChannel channel;
+    private DataType[] columnTypes;
+
+    private ByteBuffer buffer;
+    private ByteBuf buf;
+    private Tuple tuple;
+
+    private int headerSize = 0; // Header size of a tuple
+    private BitArray nullFlags;
+    private static final int RECORD_SIZE = 4;
+    private boolean eos = false;
+    private long startOffset;
+    private long endOffset;
+    private FileInputStream fis;
+    private long recordCount;
+    private long totalReadBytes;
+    private long filePosition;
+    private boolean forceFillBuffer;
+
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
+      super(conf, schema, meta, fragment);
+    }
+
+    public void init() throws IOException {
+      File file;
+      try {
+        if (fragment.getPath().toUri().getScheme() != null) {
+          file = new File(fragment.getPath().toUri());
+        } else {
+          file = new File(fragment.getPath().toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
+      }
+      fis = new FileInputStream(file);
+      channel = fis.getChannel();
+      filePosition = startOffset = fragment.getStartKey();
+      endOffset = fragment.getStartKey() + fragment.getLength();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
+            + ", fragment length :" + fragment.getLength());
+      }
+
+      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buffer = buf.nioBuffer(0, buf.capacity());
+
+      columnTypes = new DataType[schema.size()];
+      for (int i = 0; i < schema.size(); i++) {
+        columnTypes[i] = schema.getColumn(i).getDataType();
+      }
+
+      tuple = new VTuple(columnTypes.length);
+      nullFlags = new BitArray(schema.size());
+      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
+
+      // initial set position
+      if (fragment.getStartKey() > 0) {
+        channel.position(fragment.getStartKey());
+      }
+
+      forceFillBuffer = true;
+      super.init();
+    }
+
+    @Override
+    public long getNextOffset() throws IOException {
+      return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+      eos = false;
+      filePosition = channel.position();
+
+      // do not fill the buffer if the offset is already included in the buffer.
+      if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){
+        buffer.position((int)(offset - (filePosition - buffer.limit())));
+      } else {
+        if(offset < startOffset || offset > startOffset + fragment.getLength()){
+          throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
+              startOffset, startOffset + fragment.getLength(), offset));
+        }
+        channel.position(offset);
+        filePosition = offset;
+        buffer.clear();
+        forceFillBuffer = true;
+        fillBuffer();
+      }
+    }
+
+    private boolean fillBuffer() throws IOException {
+      if(!forceFillBuffer) buffer.compact();
+
+      int bytesRead = channel.read(buffer);
+      forceFillBuffer = false;
+      if (bytesRead == -1) {
+        eos = true;
+        return false;
+      } else {
+        buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero
+        filePosition += bytesRead;
+        totalReadBytes += bytesRead;
+        return true;
+      }
+    }
+
+    /**
+     * Decode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n An unsigned 32-bit integer, stored in a signed int because
+     *          Java has no explicit unsigned support.
+     * @return A signed 32-bit integer.
+     */
+    public static int decodeZigZag32(final int n) {
+      return (n >>> 1) ^ -(n & 1);
+    }
+
+    /**
+     * Decode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n An unsigned 64-bit integer, stored in a signed int because
+     *          Java has no explicit unsigned support.
+     * @return A signed 64-bit integer.
+     */
+    public static long decodeZigZag64(final long n) {
+      return (n >>> 1) ^ -(n & 1);
+    }
+
+
+    /**
+     * Read a raw Varint from the stream.  If larger than 32 bits, discard the
+     * upper bits.
+     */
+    public int readRawVarint32() throws IOException {
+      byte tmp = buffer.get();
+      if (tmp >= 0) {
+        return tmp;
+      }
+      int result = tmp & 0x7f;
+      if ((tmp = buffer.get()) >= 0) {
+        result |= tmp << 7;
+      } else {
+        result |= (tmp & 0x7f) << 7;
+        if ((tmp = buffer.get()) >= 0) {
+          result |= tmp << 14;
+        } else {
+          result |= (tmp & 0x7f) << 14;
+          if ((tmp = buffer.get()) >= 0) {
+            result |= tmp << 21;
+          } else {
+            result |= (tmp & 0x7f) << 21;
+            result |= (tmp = buffer.get()) << 28;
+            if (tmp < 0) {
+              // Discard upper 32 bits.
+              for (int i = 0; i < 5; i++) {
+                if (buffer.get() >= 0) {
+                  return result;
+                }
+              }
+              throw new IOException("Invalid Variable int32");
+            }
+          }
+        }
+      }
+      return result;
+    }
+
+    /** Read a raw Varint from the stream. */
+    public long readRawVarint64() throws IOException {
+      int shift = 0;
+      long result = 0;
+      while (shift < 64) {
+        final byte b = buffer.get();
+        result |= (long)(b & 0x7F) << shift;
+        if ((b & 0x80) == 0) {
+          return result;
+        }
+        shift += 7;
+      }
+      throw new IOException("Invalid Variable int64");
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      if(eos) return null;
+
+      if (forceFillBuffer || buffer.remaining() < headerSize) {
+        if (!fillBuffer()) {
+          return null;
+        }
+      }
+
+      // backup the buffer state
+      int bufferLimit = buffer.limit();
+      int recordSize = buffer.getInt();
+      int nullFlagSize = buffer.getShort();
+
+      buffer.limit(buffer.position() + nullFlagSize);
+      nullFlags.fromByteBuffer(buffer);
+      // restore the start of record contents
+      buffer.limit(bufferLimit);
+      if (buffer.remaining() < (recordSize - headerSize)) {
+
+        //if the buffer reaches the writable size, the buffer increase the record size
+        reSizeBuffer(recordSize);
+
+        if (!fillBuffer()) {
+          return null;
+        }
+      }
+
+      for (int i = 0; i < columnTypes.length; i++) {
+        // check if the i'th column is null
+        if (nullFlags.get(i)) {
+          tuple.put(i, DatumFactory.createNullDatum());
+          continue;
+        }
+
+        switch (columnTypes[i].getType()) {
+          case BOOLEAN :
+            tuple.put(i, DatumFactory.createBool(buffer.get()));
+            break;
+
+          case BIT :
+            tuple.put(i, DatumFactory.createBit(buffer.get()));
+            break;
+
+          case CHAR :
+            int realLen = readRawVarint32();
+            byte[] buf = new byte[realLen];
+            buffer.get(buf);
+            tuple.put(i, DatumFactory.createChar(buf));
+            break;
+
+          case INT2 :
+            tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+            break;
+
+          case INT4 :
+            tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
+            break;
+
+          case INT8 :
+            tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
+            break;
+
+          case FLOAT4 :
+            tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+            break;
+
+          case FLOAT8 :
+            tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+            break;
+
+          case TEXT : {
+            int len = readRawVarint32();
+            byte [] strBytes = new byte[len];
+            buffer.get(strBytes);
+            tuple.put(i, DatumFactory.createText(strBytes));
+            break;
+          }
+
+          case BLOB : {
+            int len = readRawVarint32();
+            byte [] rawBytes = new byte[len];
+            buffer.get(rawBytes);
+            tuple.put(i, DatumFactory.createBlob(rawBytes));
+            break;
+          }
+
+          case PROTOBUF: {
+            int len = readRawVarint32();
+            byte [] rawBytes = new byte[len];
+            buffer.get(rawBytes);
+
+            ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
+            Message.Builder builder = factory.newBuilder();
+            builder.mergeFrom(rawBytes);
+            tuple.put(i, factory.createDatum(builder.build()));
+            break;
+          }
+
+          case INET4 :
+            byte [] ipv4Bytes = new byte[4];
+            buffer.get(ipv4Bytes);
+            tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
+            break;
+
+          case DATE: {
+            int val = buffer.getInt();
+            if (val < Integer.MIN_VALUE + 1) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
+            }
+            break;
+          }
+          case TIME:
+          case TIMESTAMP: {
+            long val = buffer.getLong();
+            if (val < Long.MIN_VALUE + 1) {
+              tuple.put(i, DatumFactory.createNullDatum());
+            } else {
+              tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
+            }
+            break;
+          }
+          case NULL_TYPE:
+            tuple.put(i, NullDatum.get());
+            break;
+
+          default:
+        }
+      }
+
+      recordCount++;
+
+      if(filePosition - buffer.remaining() >= endOffset){
+        eos = true;
+      }
+      return new VTuple(tuple);
+    }
+
+    private void reSizeBuffer(int writableBytes){
+      if (buffer.capacity() - buffer.remaining()  <  writableBytes) {
+        buf.setIndex(buffer.position(), buffer.limit());
+        buf.markReaderIndex();
+        buf.discardSomeReadBytes();
+        buf.ensureWritable(writableBytes);
+        buffer = buf.nioBuffer(0, buf.capacity());
+        buffer.limit(buf.writerIndex());
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      // reset the buffer
+      buffer.clear();
+      forceFillBuffer = true;
+      filePosition = fragment.getStartKey();
+      channel.position(filePosition);
+      eos = false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if(buf != null){
+        buffer.clear();
+        buffer = null;
+
+        buf.release();
+        buf = null;
+      }
+
+      IOUtils.cleanup(LOG, channel, fis);
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable(){
+      return false;
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      if(tableStats != null){
+        tableStats.setNumRows(recordCount);
+        tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
+        tableStats.setNumBytes(fragment.getLength());
+      }
+      return tableStats;
+    }
+
+    @Override
+    public float getProgress() {
+      if(eos) {
+        return 1.0f;
+      }
+
+      if (filePosition - startOffset == 0) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, ((float) filePosition / endOffset));
+      }
+    }
+  }
+
+  public static class RawFileAppender extends FileAppender {
+    private FileChannel channel;
+    private RandomAccessFile randomAccessFile;
+    private DataType[] columnTypes;
+
+    private ByteBuffer buffer;
+    private ByteBuf buf;
+    private BitArray nullFlags;
+    private int headerSize = 0;
+    private static final int RECORD_SIZE = 4;
+    private long pos;
+
+    private TableStatistics stats;
+
+    public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                           Schema schema, TableMeta meta, Path workDir) throws IOException {
+      super(conf, taskAttemptId, schema, meta, workDir);
+    }
+
+    public void init() throws IOException {
+      File file;
+      try {
+        if (path.toUri().getScheme() != null) {
+          file = new File(path.toUri());
+        } else {
+          file = new File(path.toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
+      }
+
+      randomAccessFile = new RandomAccessFile(file, "rw");
+      channel = randomAccessFile.getChannel();
+      pos = 0;
+
+      columnTypes = new DataType[schema.size()];
+      for (int i = 0; i < schema.size(); i++) {
+        columnTypes[i] = schema.getColumn(i).getDataType();
+      }
+
+      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buffer = buf.nioBuffer(0, buf.capacity());
+
+      // comput the number of bytes, representing the null flags
+
+      nullFlags = new BitArray(schema.size());
+      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+
+      super.init();
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return pos;
+    }
+
+    private void flushBuffer() throws IOException {
+      buffer.flip();
+      channel.write(buffer);
+      buffer.clear();
+    }
+
+    private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
+        throws IOException {
+
+      // if the buffer reaches the limit,
+      // write the bytes from 0 to the previous record.
+      if (buffer.remaining() < sizeToBeWritten) {
+
+        int limit = buffer.position();
+        buffer.limit(recordOffset);
+        buffer.flip();
+        channel.write(buffer);
+        buffer.position(recordOffset);
+        buffer.limit(limit);
+        buffer.compact();
+
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    /**
+     * Encode a ZigZag-encoded 32-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n A signed 32-bit integer.
+     * @return An unsigned 32-bit integer, stored in a signed int because
+     *         Java has no explicit unsigned support.
+     */
+    public static int encodeZigZag32(final int n) {
+      // Note:  the right-shift must be arithmetic
+      return (n << 1) ^ (n >> 31);
+    }
+
+    /**
+     * Encode a ZigZag-encoded 64-bit value.  ZigZag encodes signed integers
+     * into values that can be efficiently encoded with varint.  (Otherwise,
+     * negative values must be sign-extended to 64 bits to be varint encoded,
+     * thus always taking 10 bytes on the wire.)
+     *
+     * @param n A signed 64-bit integer.
+     * @return An unsigned 64-bit integer, stored in a signed int because
+     *         Java has no explicit unsigned support.
+     */
+    public static long encodeZigZag64(final long n) {
+      // Note:  the right-shift must be arithmetic
+      return (n << 1) ^ (n >> 63);
+    }
+
+    /**
+     * Encode and write a varint.  {@code value} is treated as
+     * unsigned, so it won't be sign-extended if negative.
+     */
+    public void writeRawVarint32(int value) throws IOException {
+      while (true) {
+        if ((value & ~0x7F) == 0) {
+          buffer.put((byte) value);
+          return;
+        } else {
+          buffer.put((byte) ((value & 0x7F) | 0x80));
+          value >>>= 7;
+        }
+      }
+    }
+
+    /**
+     * Compute the number of bytes that would be needed to encode a varint.
+     * {@code value} is treated as unsigned, so it won't be sign-extended if
+     * negative.
+     */
+    public static int computeRawVarint32Size(final int value) {
+      if ((value & (0xffffffff <<  7)) == 0) return 1;
+      if ((value & (0xffffffff << 14)) == 0) return 2;
+      if ((value & (0xffffffff << 21)) == 0) return 3;
+      if ((value & (0xffffffff << 28)) == 0) return 4;
+      return 5;
+    }
+
+    /** Encode and write a varint. */
+    public void writeRawVarint64(long value) throws IOException {
+      while (true) {
+        if ((value & ~0x7FL) == 0) {
+          buffer.put((byte) value);
+          return;
+        } else {
+          buffer.put((byte) ((value & 0x7F) | 0x80));
+          value >>>= 7;
+        }
+      }
+    }
+
+    @Override
+    public void addTuple(Tuple t) throws IOException {
+
+      if (buffer.remaining() < headerSize) {
+        flushBuffer();
+      }
+
+      // skip the row header
+      int recordOffset = buffer.position();
+      buffer.position(recordOffset + headerSize);
+      // reset the null flags
+      nullFlags.clear();
+      for (int i = 0; i < schema.size(); i++) {
+        if (enabledStats) {
+          stats.analyzeField(i, t.get(i));
+        }
+
+        if (t.isNull(i)) {
+          nullFlags.set(i);
+          continue;
+        }
+
+        // 8 is the maximum bytes size of all types
+        if (flushBufferAndReplace(recordOffset, 8)) {
+          recordOffset = 0;
+        }
+
+        switch(columnTypes[i].getType()) {
+          case NULL_TYPE:
+            nullFlags.set(i);
+            continue;
+
+          case BOOLEAN:
+          case BIT:
+            buffer.put(t.getByte(i));
+            break;
+
+          case INT2 :
+            buffer.putShort(t.getInt2(i));
+            break;
+
+          case INT4 :
+            writeRawVarint32(encodeZigZag32(t.getInt4(i)));
+            break;
+
+          case INT8 :
+            writeRawVarint64(encodeZigZag64(t.getInt8(i)));
+            break;
+
+          case FLOAT4 :
+            buffer.putFloat(t.getFloat4(i));
+            break;
+
+          case FLOAT8 :
+            buffer.putDouble(t.getFloat8(i));
+            break;
+
+          case CHAR:
+          case TEXT: {
+            byte [] strBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) {
+              recordOffset = 0;
+            }
+            writeRawVarint32(strBytes.length);
+            buffer.put(strBytes);
+            break;
+          }
+
+        case DATE:
+          buffer.putInt(t.getInt4(i));
+          break;
+
+        case TIME:
+        case TIMESTAMP:
+          buffer.putLong(t.getInt8(i));
+          break;
+
+          case BLOB : {
+            byte [] rawBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
+              recordOffset = 0;
+            }
+            writeRawVarint32(rawBytes.length);
+            buffer.put(rawBytes);
+            break;
+          }
+
+          case PROTOBUF: {
+            byte [] rawBytes = t.getBytes(i);
+            if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
+              recordOffset = 0;
+            }
+            writeRawVarint32(rawBytes.length);
+            buffer.put(rawBytes);
+            break;
+          }
+
+          case INET4 :
+            buffer.put(t.getBytes(i));
+            break;
+
+          default:
+            throw new IOException("Cannot support data type: " + columnTypes[i].getType());
+        }
+      }
+
+      // write a record header
+      int bufferPos = buffer.position();
+      buffer.position(recordOffset);
+      buffer.putInt(bufferPos - recordOffset);
+      byte [] flags = nullFlags.toArray();
+      buffer.putShort((short) flags.length);
+      buffer.put(flags);
+
+      pos += bufferPos - recordOffset;
+      buffer.position(bufferPos);
+
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if(buffer != null){
+        flushBuffer();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      flush();
+      if (enabledStats) {
+        stats.setNumBytes(getOffset());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+      }
+
+      if(buf != null){
+        buffer.clear();
+        buffer = null;
+
+        buf.release();
+        buf = null;
+      }
+
+      IOUtils.cleanup(LOG, channel, randomAccessFile);
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        stats.setNumBytes(pos);
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
new file mode 100644
index 0000000..8da6ada
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.BitArray;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+public class RowFile {
+  public static final Log LOG = LogFactory.getLog(RowFile.class);
+
+  private static final int SYNC_ESCAPE = -1;
+  private static final int SYNC_HASH_SIZE = 16;
+  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
+  private final static int DEFAULT_BUFFER_SIZE = 65535;
+  public static int SYNC_INTERVAL;
+
+  public static class RowFileScanner extends FileScanner {
+    private FileSystem fs;
+    private FSDataInputStream in;
+    private Tuple tuple;
+
+    private byte[] sync = new byte[SYNC_HASH_SIZE];
+    private byte[] checkSync = new byte[SYNC_HASH_SIZE];
+    private long start, end;
+
+    private ByteBuffer buffer;
+    private final int tupleHeaderSize;
+    private BitArray nullFlags;
+    private long bufferStartPos;
+
+    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
+        throws IOException {
+      super(conf, schema, meta, fragment);
+
+      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
+          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
+
+      nullFlags = new BitArray(schema.size());
+      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
+      this.start = this.fragment.getStartKey();
+      this.end = this.start + this.fragment.getLength();
+    }
+
+    public void init() throws IOException {
+      // set default page size.
+      fs = fragment.getPath().getFileSystem(conf);
+      in = fs.open(fragment.getPath());
+      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
+      buffer.flip();
+
+      readHeader();
+
+      // find the correct position from the start
+      if (this.start > in.getPos()) {
+        long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
+        in.seek(realStart);
+      }
+      bufferStartPos = in.getPos();
+      fillBuffer();
+
+      if (start != 0) {
+        // TODO: improve
+        boolean syncFound = false;
+        while (!syncFound) {
+          if (buffer.remaining() < SYNC_SIZE) {
+            fillBuffer();
+          }
+          buffer.mark();
+          syncFound = checkSync();
+          if (!syncFound) {
+            buffer.reset();
+            buffer.get(); // proceed one byte
+          }
+        }
+        bufferStartPos += buffer.position();
+        buffer.compact();
+        buffer.flip();
+      }
+
+      super.init();
+    }
+
+    private void readHeader() throws IOException {
+      SYNC_INTERVAL = in.readInt();
+      StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
+    }
+
+    /**
+     * Find the sync from the front of the buffer
+     *
+     * @return return true if it succeeds to find the sync.
+     * @throws java.io.IOException
+     */
+    private boolean checkSync() throws IOException {
+      buffer.getInt();                           // escape
+      buffer.get(checkSync, 0, SYNC_HASH_SIZE);  // sync
+      return Arrays.equals(checkSync, sync);
+    }
+
+    private int fillBuffer() throws IOException {
+      bufferStartPos += buffer.position();
+      buffer.compact();
+      int remain = buffer.remaining();
+      int read = in.read(buffer);
+      if (read == -1) {
+        buffer.flip();
+        return read;
+      } else {
+        int totalRead = read;
+        if (remain > totalRead) {
+          read = in.read(buffer);
+          totalRead += read > 0 ? read : 0;
+        }
+        buffer.flip();
+        return totalRead;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      while (buffer.remaining() < SYNC_SIZE) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      buffer.mark();
+      if (!checkSync()) {
+        buffer.reset();
+      } else {
+        if (bufferStartPos + buffer.position() > end) {
+          return null;
+        }
+      }
+
+      while (buffer.remaining() < tupleHeaderSize) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      int i;
+      tuple = new VTuple(schema.size());
+
+      int nullFlagSize = buffer.getShort();
+      byte[] nullFlagBytes = new byte[nullFlagSize];
+      buffer.get(nullFlagBytes, 0, nullFlagSize);
+      nullFlags = new BitArray(nullFlagBytes);
+      int tupleSize = buffer.getShort();
+
+      while (buffer.remaining() < (tupleSize)) {
+        if (fillBuffer() < 0) {
+          return null;
+        }
+      }
+
+      Datum datum;
+      Column col;
+      for (i = 0; i < schema.size(); i++) {
+        if (!nullFlags.get(i)) {
+          col = schema.getColumn(i);
+          switch (col.getDataType().getType()) {
+            case BOOLEAN :
+              datum = DatumFactory.createBool(buffer.get());
+              tuple.put(i, datum);
+              break;
+
+            case BIT:
+              datum = DatumFactory.createBit(buffer.get());
+              tuple.put(i, datum );
+              break;
+
+            case CHAR :
+              int realLen = buffer.getInt();
+              byte[] buf = new byte[col.getDataType().getLength()];
+              buffer.get(buf);
+              byte[] charBuf = Arrays.copyOf(buf, realLen);
+              tuple.put(i, DatumFactory.createChar(charBuf));
+              break;
+
+            case INT2 :
+              datum = DatumFactory.createInt2(buffer.getShort());
+              tuple.put(i, datum );
+              break;
+
+            case INT4 :
+              datum = DatumFactory.createInt4(buffer.getInt());
+              tuple.put(i, datum );
+              break;
+
+            case INT8 :
+              datum = DatumFactory.createInt8(buffer.getLong());
+              tuple.put(i, datum );
+              break;
+
+            case FLOAT4 :
+              datum = DatumFactory.createFloat4(buffer.getFloat());
+              tuple.put(i, datum);
+              break;
+
+            case FLOAT8 :
+              datum = DatumFactory.createFloat8(buffer.getDouble());
+              tuple.put(i, datum);
+              break;
+
+            case TEXT:
+              short bytelen = buffer.getShort();
+              byte[] strbytes = new byte[bytelen];
+              buffer.get(strbytes, 0, bytelen);
+              datum = DatumFactory.createText(strbytes);
+              tuple.put(i, datum);
+              break;
+
+            case BLOB:
+              short bytesLen = buffer.getShort();
+              byte [] bytesBuf = new byte[bytesLen];
+              buffer.get(bytesBuf);
+              datum = DatumFactory.createBlob(bytesBuf);
+              tuple.put(i, datum);
+              break;
+
+            case INET4 :
+              byte[] ipv4 = new byte[4];
+              buffer.get(ipv4, 0, 4);
+              datum = DatumFactory.createInet4(ipv4);
+              tuple.put(i, datum);
+              break;
+
+            default:
+              break;
+          }
+        } else {
+          tuple.put(i, DatumFactory.createNullDatum());
+        }
+      }
+      return tuple;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (in != null) {
+        in.close();
+      }
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable(){
+      return true;
+    }
+  }
+
+  public static class RowFileAppender extends FileAppender {
+    private FSDataOutputStream out;
+    private long lastSyncPos;
+    private FileSystem fs;
+    private byte[] sync;
+    private ByteBuffer buffer;
+
+    private BitArray nullFlags;
+    // statistics
+    private TableStatistics stats;
+
+    public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+                           final Schema schema, final TableMeta meta, final Path workDir)
+        throws IOException {
+      super(conf, taskAttemptId, schema, meta, workDir);
+    }
+
+    public void init() throws IOException {
+      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
+          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
+      fs = path.getFileSystem(conf);
+
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.toString());
+      }
+
+      if (fs.exists(path)) {
+        throw new AlreadyExistsStorageException(path);
+      }
+
+      sync = new byte[SYNC_HASH_SIZE];
+      lastSyncPos = 0;
+
+      out = fs.create(path);
+
+      MessageDigest md;
+      try {
+        md = MessageDigest.getInstance("MD5");
+        md.update((path.toString()+System.currentTimeMillis()).getBytes());
+        sync = md.digest();
+      } catch (NoSuchAlgorithmException e) {
+        LOG.error(e);
+      }
+
+      writeHeader();
+
+      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+
+      nullFlags = new BitArray(schema.size());
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+    }
+
+    private void writeHeader() throws IOException {
+      out.writeInt(SYNC_INTERVAL);
+      out.write(sync);
+      out.flush();
+      lastSyncPos = out.getPos();
+    }
+
+    @Override
+    public void addTuple(Tuple t) throws IOException {
+      checkAndWriteSync();
+      Column col;
+
+      buffer.clear();
+      nullFlags.clear();
+
+      for (int i = 0; i < schema.size(); i++) {
+        if (enabledStats) {
+          stats.analyzeField(i, t.get(i));
+        }
+
+        if (t.isNull(i)) {
+          nullFlags.set(i);
+        } else {
+          col = schema.getColumn(i);
+          switch (col.getDataType().getType()) {
+            case BOOLEAN:
+              buffer.put(t.get(i).asByte());
+              break;
+            case BIT:
+              buffer.put(t.get(i).asByte());
+              break;
+            case CHAR:
+              byte[] src = t.get(i).asByteArray();
+              byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
+              buffer.putInt(src.length);
+              buffer.put(dst);
+              break;
+            case TEXT:
+              byte [] strbytes = t.get(i).asByteArray();
+              buffer.putShort((short)strbytes.length);
+              buffer.put(strbytes, 0, strbytes.length);
+              break;
+            case INT2:
+              buffer.putShort(t.get(i).asInt2());
+              break;
+            case INT4:
+              buffer.putInt(t.get(i).asInt4());
+              break;
+            case INT8:
+              buffer.putLong(t.get(i).asInt8());
+              break;
+            case FLOAT4:
+              buffer.putFloat(t.get(i).asFloat4());
+              break;
+            case FLOAT8:
+              buffer.putDouble(t.get(i).asFloat8());
+              break;
+            case BLOB:
+              byte [] bytes = t.get(i).asByteArray();
+              buffer.putShort((short)bytes.length);
+              buffer.put(bytes);
+              break;
+            case INET4:
+              buffer.put(t.get(i).asByteArray());
+              break;
+            case INET6:
+              buffer.put(t.get(i).asByteArray());
+              break;
+            case NULL_TYPE:
+              nullFlags.set(i);
+              break;
+            default:
+              break;
+          }
+        }
+      }
+
+      byte[] bytes = nullFlags.toArray();
+      out.writeShort(bytes.length);
+      out.write(bytes);
+
+      bytes = buffer.array();
+      int dataLen = buffer.position();
+      out.writeShort(dataLen);
+      out.write(bytes, 0, dataLen);
+
+      // Statistical section
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return out.getPos();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (out != null) {
+        if (enabledStats) {
+          stats.setNumBytes(out.getPos());
+        }
+        sync();
+        out.flush();
+        out.close();
+      }
+    }
+
+    private void sync() throws IOException {
+      if (lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE);
+        out.write(sync);
+        lastSyncPos = out.getPos();
+      }
+    }
+
+    private void checkAndWriteSync() throws IOException {
+      if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+        sync();
+      }
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+  public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+    super(in, recordDelimiterBytes);
+  }
+
+  public SplitLineReader(InputStream in, Configuration conf,
+                         byte[] recordDelimiterBytes) throws IOException {
+    super(in, conf, recordDelimiterBytes);
+  }
+
+  public boolean needAdditionalRecordAfterSplit() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
new file mode 100644
index 0000000..dbb8bd0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * FileAppender for writing to Avro files.
+ */
+public class AvroAppender extends FileAppender {
+  private TableStatistics stats;
+  private Schema avroSchema;
+  private List<Schema.Field> avroFields;
+  private DataFileWriter<GenericRecord> dataFileWriter;
+
+  /**
+   * Creates a new AvroAppender.
+   *
+   * @param conf Configuration properties.
+   * @param taskAttemptId The task attempt id
+   * @param schema The table schema.
+   * @param meta The table metadata.
+   * @param workDir The path of the Parquet file to write to.
+   */
+  public AvroAppender(Configuration conf,
+                      QueryUnitAttemptId taskAttemptId,
+                      org.apache.tajo.catalog.Schema schema,
+                      TableMeta meta, Path workDir) throws IOException {
+    super(conf, taskAttemptId, schema, meta, workDir);
+  }
+
+  /**
+   * Initializes the Appender.
+   */
+  public void init() throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (!fs.exists(path.getParent())) {
+      throw new FileNotFoundException(path.toString());
+    }
+    FSDataOutputStream outputStream = fs.create(path);
+
+    avroSchema = AvroUtil.getAvroSchema(meta, conf);
+    avroFields = avroSchema.getFields();
+
+    DatumWriter<GenericRecord> datumWriter =
+        new GenericDatumWriter<GenericRecord>(avroSchema);
+    dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+    dataFileWriter.create(avroSchema, outputStream);
+
+    if (enabledStats) {
+      this.stats = new TableStatistics(schema);
+    }
+    super.init();
+  }
+
+  /**
+   * Gets the current offset. Tracking offsets is currenly not implemented, so
+   * this method always returns 0.
+   *
+   * @return 0
+   */
+  @Override
+  public long getOffset() throws IOException {
+    return 0;
+  }
+
+  private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
+    if (tuple.get(i) instanceof NullDatum) {
+      return null;
+    }
+    switch (avroType) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+        return tuple.getBool(i);
+      case INT:
+        return tuple.getInt4(i);
+      case LONG:
+        return tuple.getInt8(i);
+      case FLOAT:
+        return tuple.getFloat4(i);
+      case DOUBLE:
+        return tuple.getFloat8(i);
+      case BYTES:
+      case FIXED:
+        return ByteBuffer.wrap(tuple.getBytes(i));
+      case STRING:
+        return tuple.getText(i);
+      default:
+        throw new RuntimeException("Unknown primitive type.");
+    }
+  }
+
+  /**
+   * Write a Tuple to the Avro file.
+   *
+   * @param tuple The Tuple to write.
+   */
+  @Override
+  public void addTuple(Tuple tuple) throws IOException {
+    GenericRecord record = new GenericData.Record(avroSchema);
+    for (int i = 0; i < schema.size(); ++i) {
+      Column column = schema.getColumn(i);
+      if (enabledStats) {
+        stats.analyzeField(i, tuple.get(i));
+      }
+      Object value;
+      Schema.Field avroField = avroFields.get(i);
+      Schema.Type avroType = avroField.schema().getType();
+      switch (avroType) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+        case FIXED:
+          value = getPrimitive(tuple, i, avroType);
+          break;
+        case RECORD:
+          throw new RuntimeException("Avro RECORD not supported.");
+        case ENUM:
+          throw new RuntimeException("Avro ENUM not supported.");
+        case MAP:
+          throw new RuntimeException("Avro MAP not supported.");
+        case UNION:
+          List<Schema> schemas = avroField.schema().getTypes();
+          if (schemas.size() != 2) {
+            throw new RuntimeException("Avro UNION not supported.");
+          }
+          if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+            value = getPrimitive(tuple, i, schemas.get(1).getType());
+          } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+            value = getPrimitive(tuple, i, schemas.get(0).getType());
+          } else {
+            throw new RuntimeException("Avro UNION not supported.");
+          }
+          break;
+        default:
+          throw new RuntimeException("Unknown type: " + avroType);
+      }
+      record.put(i, value);
+    }
+    dataFileWriter.append(record);
+
+    if (enabledStats) {
+      stats.incrementRow();
+    }
+  }
+
+  /**
+   * Flushes the current state of the file.
+   */
+  @Override
+  public void flush() throws IOException {
+    dataFileWriter.flush();
+  }
+
+  /**
+   * Closes the Appender.
+   */
+  @Override
+  public void close() throws IOException {
+    dataFileWriter.close();
+  }
+
+  /**
+   * If table statistics is enabled, retrieve the table statistics.
+   *
+   * @return Table statistics if enabled or null otherwise.
+   */
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
new file mode 100644
index 0000000..51594df
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * FileScanner for reading Avro files
+ */
+public class AvroScanner extends FileScanner {
+  private Schema avroSchema;
+  private List<Schema.Field> avroFields;
+  private DataFileReader<GenericRecord> dataFileReader;
+  private int[] projectionMap;
+
+  /**
+   * Creates a new AvroScanner.
+   *
+   * @param conf
+   * @param schema
+   * @param meta
+   * @param fragment
+   */
+  public AvroScanner(Configuration conf,
+                     final org.apache.tajo.catalog.Schema schema,
+                     final TableMeta meta, final Fragment fragment) {
+    super(conf, schema, meta, fragment);
+  }
+
+  /**
+   * Initializes the AvroScanner.
+   */
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+    prepareProjection(targets);
+
+    avroSchema = AvroUtil.getAvroSchema(meta, conf);
+    avroFields = avroSchema.getFields();
+
+    DatumReader<GenericRecord> datumReader =
+        new GenericDatumReader<GenericRecord>(avroSchema);
+    SeekableInput input = new FsInput(fragment.getPath(), conf);
+    dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
+    super.init();
+  }
+
+  private void prepareProjection(Column[] targets) {
+    projectionMap = new int[targets.length];
+    for (int i = 0; i < targets.length; ++i) {
+      projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
+    }
+  }
+
+  private static String fromAvroString(Object value) {
+    if (value instanceof Utf8) {
+      Utf8 utf8 = (Utf8)value;
+      return utf8.toString();
+    }
+    return value.toString();
+  }
+
+  private static Schema getNonNull(Schema schema) {
+    if (!schema.getType().equals(Schema.Type.UNION)) {
+      return schema;
+    }
+    List<Schema> schemas = schema.getTypes();
+    if (schemas.size() != 2) {
+      return schema;
+    }
+    if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+      return schemas.get(1);
+    } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+      return schemas.get(0);
+    } else {
+      return schema;
+    }
+  }
+
+  private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
+    int intValue = (Integer)value;
+    switch (tajoType) {
+      case BIT:
+        return DatumFactory.createBit((byte)(intValue & 0xff));
+      case INT2:
+        return DatumFactory.createInt2((short)intValue);
+      default:
+        return DatumFactory.createInt4(intValue);
+    }
+  }
+
+  private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
+                             DataType dataType) {
+    ByteBuffer buffer = (ByteBuffer)value;
+    byte[] bytes = new byte[buffer.capacity()];
+    buffer.get(bytes, 0, bytes.length);
+    switch (tajoType) {
+      case INET4:
+        return DatumFactory.createInet4(bytes);
+      case PROTOBUF:
+        try {
+          ProtobufDatumFactory factory =
+              ProtobufDatumFactory.get(dataType.getCode());
+          Message.Builder builder = factory.newBuilder();
+          builder.mergeFrom(bytes);
+          return factory.createDatum(builder);
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException(e);
+        }
+      default:
+        return new BlobDatum(bytes);
+    }
+  }
+
+  private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
+    switch (tajoType) {
+      case CHAR:
+        return DatumFactory.createChar(fromAvroString(value));
+      default:
+        return DatumFactory.createText(fromAvroString(value));
+    }
+  }
+
+  /**
+   * Reads the next Tuple from the Avro file.
+   *
+   * @return The next Tuple from the Avro file or null if end of file is
+   *         reached.
+   */
+  @Override
+  public Tuple next() throws IOException {
+    if (!dataFileReader.hasNext()) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(schema.size());
+    GenericRecord record = dataFileReader.next();
+    for (int i = 0; i < projectionMap.length; ++i) {
+      int columnIndex = projectionMap[i];
+      Object value = record.get(columnIndex);
+      if (value == null) {
+        tuple.put(columnIndex, NullDatum.get());
+        continue;
+      }
+
+      // Get Avro type.
+      Schema.Field avroField = avroFields.get(columnIndex);
+      Schema nonNullAvroSchema = getNonNull(avroField.schema());
+      Schema.Type avroType = nonNullAvroSchema.getType();
+
+      // Get Tajo type.
+      Column column = schema.getColumn(columnIndex);
+      DataType dataType = column.getDataType();
+      TajoDataTypes.Type tajoType = dataType.getType();
+      switch (avroType) {
+        case NULL:
+          tuple.put(columnIndex, NullDatum.get());
+          break;
+        case BOOLEAN:
+          tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
+          break;
+        case INT:
+          tuple.put(columnIndex, convertInt(value, tajoType));
+          break;
+        case LONG:
+          tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
+          break;
+        case FLOAT:
+          tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
+          break;
+        case DOUBLE:
+          tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
+          break;
+        case BYTES:
+          tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
+          break;
+        case STRING:
+          tuple.put(columnIndex, convertString(value, tajoType));
+          break;
+        case RECORD:
+          throw new RuntimeException("Avro RECORD not supported.");
+        case ENUM:
+          throw new RuntimeException("Avro ENUM not supported.");
+        case MAP:
+          throw new RuntimeException("Avro MAP not supported.");
+        case UNION:
+          throw new RuntimeException("Avro UNION not supported.");
+        case FIXED:
+          tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
+          break;
+        default:
+          throw new RuntimeException("Unknown type.");
+      }
+    }
+    return tuple;
+  }
+
+  /**
+   * Resets the scanner
+   */
+  @Override
+  public void reset() throws IOException {
+  }
+
+  /**
+   * Closes the scanner.
+   */
+  @Override
+  public void close() throws IOException {
+    if (dataFileReader != null) {
+      dataFileReader.close();
+    }
+  }
+
+  /**
+   * Returns whether this scanner is projectable.
+   *
+   * @return true
+   */
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  /**
+   * Returns whether this scanner is selectable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  /**
+   * Returns whether this scanner is splittable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
new file mode 100644
index 0000000..0d14c3d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+public class AvroUtil {
+  public static Schema getAvroSchema(TableMeta meta, Configuration conf)
+      throws IOException {
+
+    boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
+    boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
+    if (!isSchemaLiteral && !isSchemaUrl) {
+      throw new RuntimeException("No Avro schema for table.");
+    }
+    if (isSchemaLiteral) {
+      String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
+      return new Schema.Parser().parse(schema);
+    }
+
+    String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
+    if (schemaURL.toLowerCase().startsWith("http")) {
+      return getAvroSchemaFromHttp(schemaURL);
+    } else {
+      return getAvroSchemaFromFileSystem(schemaURL, conf);
+    }
+  }
+
+  public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
+    InputStream inputStream = new URL(schemaURL).openStream();
+
+    try {
+      return new Schema.Parser().parse(inputStream);
+    } finally {
+      IOUtils.closeStream(inputStream);
+    }
+  }
+
+  public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException {
+    Path schemaPath = new Path(schemaURL);
+    FileSystem fs = schemaPath.getFileSystem(conf);
+    FSDataInputStream inputStream = fs.open(schemaPath);
+
+    try {
+      return new Schema.Parser().parse(inputStream);
+    } finally {
+      IOUtils.closeStream(inputStream);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
new file mode 100644
index 0000000..40d1545
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * Provides read and write support for Avro files. Avro schemas are
+ * converted to Tajo schemas according to the following mapping of Avro
+ * and Tajo types:
+ * </p>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Avro type</th>
+ *     <th>Tajo type</th>
+ *   </tr>
+ *   <tr>
+ *     <td>NULL</td>
+ *     <td>NULL_TYPE</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BOOLEAN</td>
+ *     <td>BOOLEAN</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INT</td>
+ *     <td>INT4</td>
+ *   </tr>
+ *   <tr>
+ *     <td>LONG</td>
+ *     <td>INT8</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FLOAT</td>
+ *     <td>FLOAT4</td>
+ *   </tr>
+ *   <tr>
+ *     <td>DOUBLE</td>
+ *     <td>FLOAT8</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BYTES</td>
+ *     <td>BLOB</td>
+ *   </tr>
+ *   <tr>
+ *     <td>STRING</td>
+ *     <td>TEXT</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FIXED</td>
+ *     <td>BLOB</td>
+ *   </tr>
+ *   <tr>
+ *     <td>RECORD</td>
+ *     <td>Not currently supported</td>
+ *   </tr>
+ *   <tr>
+ *     <td>ENUM</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>MAP</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>UNION</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ * </table>
+ */
+
+package org.apache.tajo.storage.avro;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
new file mode 100644
index 0000000..ac413ca
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.storage.StorageFragmentProtos.*;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
+  @Expose private String tableName; // required
+  @Expose private Path uri; // required
+  @Expose public Long startOffset; // required
+  @Expose public Long length; // required
+
+  private String[] hosts; // Datanode hostnames
+  @Expose private int[] diskIds;
+
+  public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
+    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+    builder.mergeFrom(raw);
+    builder.build();
+    init(builder.build());
+  }
+
+  public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
+      throws IOException {
+    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
+  }
+
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
+    this.set(tableName, uri, start, length, hosts, diskIds);
+  }
+  // Non splittable
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
+    this.set(tableName, uri, start, length, hosts, null);
+  }
+
+  public FileFragment(String fragmentId, Path path, long start, long length) {
+    this.set(fragmentId, path, start, length, null, null);
+  }
+
+  public FileFragment(FileFragmentProto proto) {
+    init(proto);
+  }
+
+  private void init(FileFragmentProto proto) {
+    int[] diskIds = new int[proto.getDiskIdsList().size()];
+    int i = 0;
+    for(Integer eachValue: proto.getDiskIdsList()) {
+      diskIds[i++] = eachValue;
+    }
+    this.set(proto.getId(), new Path(proto.getPath()),
+        proto.getStartOffset(), proto.getLength(),
+        proto.getHostsList().toArray(new String[]{}),
+        diskIds);
+  }
+
+  private void set(String tableName, Path path, long start,
+      long length, String[] hosts, int[] diskIds) {
+    this.tableName = tableName;
+    this.uri = path;
+    this.startOffset = start;
+    this.length = length;
+    this.hosts = hosts;
+    this.diskIds = diskIds;
+  }
+
+
+  /**
+   * Get the list of hosts (hostname) hosting this block
+   */
+  public String[] getHosts() {
+    if (hosts == null) {
+      this.hosts = new String[0];
+    }
+    return hosts;
+  }
+
+  /**
+   * Get the list of Disk Ids
+   * Unknown disk is -1. Others 0 ~ N
+   */
+  public int[] getDiskIds() {
+    if (diskIds == null) {
+      this.diskIds = new int[getHosts().length];
+      Arrays.fill(this.diskIds, -1);
+    }
+    return diskIds;
+  }
+
+  public void setDiskIds(int[] diskIds){
+    this.diskIds = diskIds;
+  }
+
+  @Override
+  public String getTableName() {
+    return this.tableName;
+  }
+
+  public Path getPath() {
+    return this.uri;
+  }
+
+  public void setPath(Path path) {
+    this.uri = path;
+  }
+
+  public Long getStartKey() {
+    return this.startOffset;
+  }
+
+  @Override
+  public String getKey() {
+    return this.uri.toString();
+  }
+
+  @Override
+  public long getLength() {
+    return this.length;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return this.length <= 0;
+  }
+  /**
+   * 
+   * The offset range of tablets <b>MUST NOT</b> be overlapped.
+   * 
+   * @param t
+   * @return If the table paths are not same, return -1.
+   */
+  @Override
+  public int compareTo(FileFragment t) {
+    if (getPath().equals(t.getPath())) {
+      long diff = this.getStartKey() - t.getStartKey();
+      if (diff < 0) {
+        return -1;
+      } else if (diff > 0) {
+        return 1;
+      } else {
+        return 0;
+      }
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FileFragment) {
+      FileFragment t = (FileFragment) o;
+      if (getPath().equals(t.getPath())
+          && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
+          && TUtil.checkEquals(t.getLength(), this.getLength())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableName, uri, startOffset, length);
+  }
+  
+  public Object clone() throws CloneNotSupportedException {
+    FileFragment frag = (FileFragment) super.clone();
+    frag.tableName = tableName;
+    frag.uri = uri;
+    frag.diskIds = diskIds;
+    frag.hosts = hosts;
+
+    return frag;
+  }
+
+  @Override
+  public String toString() {
+    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+    		+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
+        + getLength() + "}" ;
+  }
+
+  public FragmentProto getProto() {
+    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+    builder.setId(this.tableName);
+    builder.setStartOffset(this.startOffset);
+    builder.setLength(this.length);
+    builder.setPath(this.uri.toString());
+    if(diskIds != null) {
+      List<Integer> idList = new ArrayList<Integer>();
+      for(int eachId: diskIds) {
+        idList.add(eachId);
+      }
+      builder.addAllDiskIds(idList);
+    }
+
+    if(hosts != null) {
+      builder.addAllHosts(TUtil.newList(hosts));
+    }
+
+    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+    fragmentBuilder.setId(this.tableName);
+    fragmentBuilder.setStoreType(StoreType.CSV.name());
+    fragmentBuilder.setContents(builder.buildPartial().toByteString());
+    return fragmentBuilder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
new file mode 100644
index 0000000..a6af19b
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.io.IOException;
+
+public interface IndexMethod {
+  IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+                             TupleComparator comparator) throws IOException;
+  IndexReader getIndexReader(final Path fileName, Schema keySchema,
+                             TupleComparator comparator) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
new file mode 100644
index 0000000..3ae5c9d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface IndexReader {
+  
+  /**
+   * Find the offset corresponding to key which is equal to a given key.
+   * 
+   * @param key
+   * @return
+   * @throws java.io.IOException
+   */
+  public long find(Tuple key) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
new file mode 100644
index 0000000..04738f8
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public abstract class IndexWriter {
+  
+  public abstract void write(Tuple key, long offset) throws IOException;
+  
+  public abstract void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
new file mode 100644
index 0000000..0c07b4a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface OrderIndexReader extends IndexReader {
+  /**
+   * Find the offset corresponding to key which is equal to or greater than 
+   * a given key.
+   * 
+   * @param key to find
+   * @return
+   * @throws java.io.IOException
+   */
+  public long find(Tuple key, boolean nextKey) throws IOException;
+
+  /**
+   * Return the next offset from the latest find or next offset
+   * @return
+   * @throws java.io.IOException
+   */
+  public long next() throws IOException;
+}