You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:44 UTC

[07/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
new file mode 100644
index 0000000..41c9d61
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+
+/**
+ * This class is not thread-safe.
+ */
+public class TableStatistics {
+  private Schema schema;
+  private Tuple minValues;
+  private Tuple maxValues;
+  private long [] numNulls;
+  private long numRows = 0;
+  private long numBytes = 0;
+
+
+  private boolean [] comparable;
+
+  public TableStatistics(Schema schema) {
+    this.schema = schema;
+    minValues = new VTuple(schema.getColumnNum());
+    maxValues = new VTuple(schema.getColumnNum());
+
+    numNulls = new long[schema.getColumnNum()];
+    comparable = new boolean[schema.getColumnNum()];
+
+    DataType type;
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      type = schema.getColumn(i).getDataType();
+      if (type.getType() == Type.PROTOBUF) {
+        comparable[i] = false;
+      } else {
+        comparable[i] = true;
+      }
+    }
+  }
+
+  public Schema getSchema() {
+    return this.schema;
+  }
+
+  public void incrementRow() {
+    numRows++;
+  }
+
+  public long getNumRows() {
+    return this.numRows;
+  }
+
+  public void setNumBytes(long bytes) {
+    this.numBytes = bytes;
+  }
+
+  public long getNumBytes() {
+    return this.numBytes;
+  }
+
+  public void analyzeField(int idx, Datum datum) {
+    if (datum instanceof NullDatum) {
+      numNulls[idx]++;
+      return;
+    }
+
+    if (comparable[idx]) {
+      if (!maxValues.contains(idx) ||
+          maxValues.get(idx).compareTo(datum) < 0) {
+        maxValues.put(idx, datum);
+      }
+      if (!minValues.contains(idx) ||
+          minValues.get(idx).compareTo(datum) > 0) {
+        minValues.put(idx, datum);
+      }
+    }
+  }
+
+  public TableStats getTableStat() {
+    TableStats stat = new TableStats();
+
+    ColumnStats columnStats;
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      columnStats = new ColumnStats(schema.getColumn(i));
+      columnStats.setNumNulls(numNulls[i]);
+      columnStats.setMinValue(minValues.get(i));
+      columnStats.setMaxValue(maxValues.get(i));
+      stat.addColumnStat(columnStats);
+    }
+
+    stat.setNumRows(this.numRows);
+    stat.setNumBytes(this.numBytes);
+
+    return stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
new file mode 100644
index 0000000..07ea79b
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+//Compatibility with Apache Hive
+public class TextSerializerDeserializer implements SerializerDeserializer {
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
+  private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+
+  @Override
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullCharacters.length;
+          out.write(nullCharacters);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case TIME:
+      case TIMESTAMP:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+      default:
+        break;
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
+        break;
+      case BIT:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
+        break;
+      case CHAR:
+        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createChar(new String(bytes, offset, length).trim());
+        break;
+      case INT2:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt2(new String(bytes, offset, length));
+        break;
+      case INT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt4(new String(bytes, offset, length));
+        break;
+      case INT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt8(new String(bytes, offset, length));
+        break;
+      case FLOAT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat4(new String(bytes, offset, length));
+        break;
+      case FLOAT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat8(new String(bytes, offset, length));
+        break;
+      case TEXT: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createText(chars);
+        break;
+      }
+      case DATE:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createDate(new String(bytes, offset, length));
+        break;
+      case TIME:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createTime(new String(bytes, offset, length));
+        break;
+      case TIMESTAMP:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createTimeStamp(new String(bytes, offset, length));
+        break;
+      case PROTOBUF: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] protoBytes = new byte[length];
+            System.arraycopy(bytes, offset, protoBytes, 0, length);
+            protobufJsonFormat.merge(protoBytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+        break;
+      }
+      case INET4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInet4(new String(bytes, offset, length));
+        break;
+      case BLOB: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          byte[] blob = new byte[length];
+          System.arraycopy(bytes, offset, blob, 0, length);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
+        }
+        break;
+      }
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
+    return length == 0 || ((length == nullBytes.length)
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
+  }
+
+  private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
+    return length > 0 && length == nullBytes.length
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
new file mode 100644
index 0000000..ba35988
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.*;
+
+import java.net.InetAddress;
+
+public interface Tuple extends Cloneable {
+  
+	public int size();
+	
+	public boolean contains(int fieldid);
+
+  public boolean isNull(int fieldid);
+	
+	public void clear();
+	
+	public void put(int fieldId, Datum value);
+
+  public void put(int fieldId, Datum [] values);
+
+  public void put(int fieldId, Tuple tuple);
+	
+	public void put(Datum [] values);
+	
+	public Datum get(int fieldId);
+	
+	public void setOffset(long offset);
+	
+	public long getOffset();
+
+	public BooleanDatum getBoolean(int fieldId);
+	
+	public BitDatum getByte(int fieldId);
+
+  public CharDatum getChar(int fieldId);
+	
+	public BlobDatum getBytes(int fieldId);
+	
+	public Int2Datum getShort(int fieldId);
+	
+	public Int4Datum getInt(int fieldId);
+	
+	public Int8Datum getLong(int fieldId);
+	
+	public Float4Datum getFloat(int fieldId);
+	
+	public Float8Datum getDouble(int fieldId);
+	
+	public Inet4Datum getIPv4(int fieldId);
+	
+	public byte [] getIPv4Bytes(int fieldId);
+	
+	public InetAddress getIPv6(int fieldId);
+	
+	public byte [] getIPv6Bytes(int fieldId);
+	
+	public TextDatum getString(int fieldId);
+
+  public TextDatum getText(int fieldId);
+
+  public Tuple clone() throws CloneNotSupportedException;
+
+  public Datum[] getValues();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
new file mode 100644
index 0000000..69c1c04
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+
+import java.util.Comparator;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ * 
+ * @see Tuple
+ */
+public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
+  private final int[] sortKeyIds;
+  private final boolean[] asc;
+  @SuppressWarnings("unused")
+  private final boolean[] nullFirsts;  
+
+  private Datum left;
+  private Datum right;
+  private int compVal;
+
+  public TupleComparator(Schema schema, SortSpec[] sortKeys) {
+    Preconditions.checkArgument(sortKeys.length > 0, 
+        "At least one sort key must be specified.");
+
+    this.sortKeyIds = new int[sortKeys.length];
+    this.asc = new boolean[sortKeys.length];
+    this.nullFirsts = new boolean[sortKeys.length];
+    for (int i = 0; i < sortKeys.length; i++) {
+      this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+          
+      this.asc[i] = sortKeys[i].isAscending();
+      this.nullFirsts[i]= sortKeys[i].isNullFirst();
+    }
+  }
+
+  public TupleComparator(TupleComparatorProto proto) {
+    this.sortKeyIds = new int[proto.getCompSpecsCount()];
+    this.asc = new boolean[proto.getCompSpecsCount()];
+    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+      sortKeyIds[i] = sortSepcProto.getColumnId();
+      asc[i] = sortSepcProto.getAscending();
+      nullFirsts[i] = sortSepcProto.getNullFirst();
+    }
+  }
+
+  public boolean isAscendingFirstKey() {
+    return this.asc[0];
+  }
+
+  @Override
+  public int compare(Tuple tuple1, Tuple tuple2) {
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      left = tuple1.get(sortKeyIds[i]);
+      right = tuple2.get(sortKeyIds[i]);
+
+      if (left instanceof NullDatum || right instanceof NullDatum) {
+        if (!left.equals(right)) {
+          if (left instanceof NullDatum) {
+            compVal = 1;
+          } else if (right instanceof NullDatum) {
+            compVal = -1;
+          }
+          if (nullFirsts[i]) {
+            if (compVal != 0) {
+              compVal *= -1;
+            }
+          }
+        } else {
+          compVal = 0;
+        }
+      } else {
+        if (asc[i]) {
+          compVal = left.compareTo(right);
+        } else {
+          compVal = right.compareTo(left);
+        }
+      }
+
+      if (compVal < 0 || compVal > 0) {
+        return compVal;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(sortKeyIds);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof TupleComparator) {
+      TupleComparator other = (TupleComparator) obj;
+      if (sortKeyIds.length != other.sortKeyIds.length) {
+        return false;
+      }
+
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        if (sortKeyIds[i] != other.sortKeyIds[i] ||
+            asc[i] != other.asc[i] ||
+            nullFirsts[i] != other.nullFirsts[i]) {
+          return false;
+        }
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public TupleComparatorProto getProto() {
+    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+    TupleComparatorSpecProto.Builder sortSpecBuilder;
+
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+      sortSpecBuilder.setColumnId(sortKeyIds[i]);
+      sortSpecBuilder.setAscending(asc[i]);
+      sortSpecBuilder.setNullFirst(nullFirsts[i]);
+      builder.addCompSpecs(sortSpecBuilder);
+    }
+
+    return builder.build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
new file mode 100644
index 0000000..7d0f674
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+
+import java.util.Comparator;
+
+public class TupleRange implements Comparable<TupleRange> {
+  private final Schema schema;
+  private final Tuple start;
+  private final Tuple end;
+  private final TupleComparator comp;
+
+  public TupleRange(final Schema schema, final Tuple start, final Tuple end) {
+    this.comp = new TupleComparator(schema, schemaToSortSpecs(schema));
+    // if there is only one value, start == end
+    Preconditions.checkArgument(comp.compare(start, end) <= 0, ("start=" + start) + ", end=" + end);
+    this.schema = schema;
+    this.start = start;
+    this.end = end;
+  }
+
+  public static SortSpec[] schemaToSortSpecs(Schema schema) {
+    SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      specs[i] = new SortSpec(schema.getColumn(i), true, false);
+    }
+
+    return specs;
+  }
+
+  public final Schema getSchema() {
+    return this.schema;
+  }
+
+  public final Tuple getStart() {
+    return this.start;
+  }
+
+  public final Tuple getEnd() {
+    return this.end;
+  }
+
+  public String toString() {
+    return "[" + this.start + ", " + this.end+")";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(start, end);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof  TupleRange) {
+      TupleRange other = (TupleRange) obj;
+      return this.start.equals(other.start) && this.end.equals(other.end);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int compareTo(TupleRange o) {
+    // TODO - should handle overlap
+    int cmpVal = comp.compare(this.start, o.start);
+    if (cmpVal != 0) {
+      return cmpVal;
+    } else {
+      return comp.compare(this.end, o.end);
+    }
+  }
+
+  public static class DescendingTupleRangeComparator
+      implements Comparator<TupleRange> {
+
+    @Override
+    public int compare(TupleRange left, TupleRange right) {
+      return -(left.compareTo(right));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
new file mode 100644
index 0000000..878c05e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.exception.InvalidCastException;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+
+public class VTuple implements Tuple, Cloneable {
+	@Expose public Datum [] values;
+	@Expose private long offset;
+	
+	public VTuple(int size) {
+		values = new Datum[size];
+	}
+
+  public VTuple(Tuple tuple) {
+    this.values = new Datum[tuple.size()];
+    System.arraycopy(((VTuple)tuple).values, 0, values, 0, tuple.size());
+    this.offset = ((VTuple)tuple).offset;
+  }
+
+  public VTuple(Datum [] datum) {
+    this(datum.length);
+    put(datum);
+  }
+
+	@Override
+	public int size() {	
+		return values.length;
+	}
+	
+	public boolean contains(int fieldId) {
+		return values[fieldId] != null;
+	}
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return values[fieldid] instanceof NullDatum;
+  }
+
+  @Override
+  public void clear() {   
+    for (int i=0; i < values.length; i++) {
+      values[i] = null;
+    }
+  }
+	
+	//////////////////////////////////////////////////////
+	// Setter
+	//////////////////////////////////////////////////////	
+	public void put(int fieldId, Datum value) {
+		values[fieldId] = value;
+	}
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    for (int i = fieldId, j = 0; j < values.length; i++, j++) {
+      values[i] = values[j];
+    }
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
+      values[i] = tuple.get(j);
+    }
+  }
+
+  public void put(Datum [] values) {
+    System.arraycopy(values, 0, this.values, 0, size());
+	}
+	
+	//////////////////////////////////////////////////////
+	// Getter
+	//////////////////////////////////////////////////////
+	public Datum get(int fieldId) {
+		return this.values[fieldId];
+	}
+	
+	public void setOffset(long offset) {
+	  this.offset = offset;
+	}
+	
+	public long getOffset() {
+	  return this.offset;
+	}
+	
+	@Override
+	public BooleanDatum getBoolean(int fieldId) {
+		return (BooleanDatum) values[fieldId];
+	}
+
+	public BitDatum getByte(int fieldId) {
+		return (BitDatum) values[fieldId];
+	}
+
+  public CharDatum getChar(int fieldId) {
+    return (CharDatum) values[fieldId];
+  }
+
+	public BlobDatum getBytes(int fieldId) {
+		return (BlobDatum) values[fieldId];
+	}
+
+	public Int2Datum getShort(int fieldId) {
+		return (Int2Datum) values[fieldId];
+	}
+
+	public Int4Datum getInt(int fieldId) {
+		return (Int4Datum) values[fieldId];
+	}
+
+	public Int8Datum getLong(int fieldId) {
+		return (Int8Datum) values[fieldId];
+	}
+
+	public Float4Datum getFloat(int fieldId) {
+		return (Float4Datum) values[fieldId];
+	}
+
+	public Float8Datum getDouble(int fieldId) {
+		return (Float8Datum) values[fieldId];
+	}
+
+	public Inet4Datum getIPv4(int fieldId) {
+		return (Inet4Datum) values[fieldId];
+	}
+
+	public byte[] getIPv4Bytes(int fieldId) {
+		return values[fieldId].asByteArray();
+	}
+
+	public InetAddress getIPv6(int fieldId) {
+		throw new InvalidCastException("IPv6 is unsupported yet");
+	}
+
+	public byte[] getIPv6Bytes(int fieldId) {
+	  throw new InvalidCastException("IPv6 is unsupported yet");
+	}
+
+	public TextDatum getString(int fieldId) {
+		return (TextDatum) values[fieldId];
+	}
+
+  @Override
+  public TextDatum getText(int fieldId) {
+    return (TextDatum) values[fieldId];
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    VTuple tuple = (VTuple) super.clone();
+
+    tuple.values = new Datum[size()];
+    System.arraycopy(values, 0, tuple.values, 0, size()); //shallow copy
+    return tuple;
+  }
+
+  public String toString() {
+		boolean first = true;
+		StringBuilder str = new StringBuilder();
+		str.append("(");
+		for(int i=0; i < values.length; i++) {			
+			if(values[i] != null) {
+				if(first) {
+					first = false;
+				} else {
+					str.append(", ");
+				}
+				str.append(i)
+				.append("=>")
+				.append(values[i]);
+			}
+		}
+		str.append(")");
+		return str.toString();
+	}
+	
+	@Override
+	public int hashCode() {
+	  int hashCode = 37;
+	  for (int i=0; i < values.length; i++) {
+	    if(values[i] != null) {
+        hashCode ^= (values[i].hashCode() * 41);
+	    } else {
+	      hashCode = hashCode ^ (i + 17);
+	    }
+	  }
+	  
+	  return hashCode;
+	}
+
+  @Override
+  public Datum[] getValues() {
+    return values;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
new file mode 100644
index 0000000..ad19101
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ForSplitableStore {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
new file mode 100644
index 0000000..baeda8c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DoNotPool;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+  /**
+   * A global compressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
+      new HashMap<Class<Compressor>, List<Compressor>>();
+
+  /**
+   * A global decompressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
+      new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+      Class<? extends T> codecClass) {
+    T codec = null;
+
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(codecList.size() - 1);
+            }
+          }
+        }
+      }
+    }
+
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = (Class<T>) codec.getClass();
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Compressor</code>
+   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
+   * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
+   *         from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
+    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      compressor.reinit(conf);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled compressor");
+      }
+    }
+    return compressor;
+  }
+
+  public static Compressor getCompressor(CompressionCodec codec) {
+    return getCompressor(codec, null);
+  }
+
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+        .getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled decompressor");
+      }
+    }
+    return decompressor;
+  }
+
+  /**
+   * Return the {@link Compressor} to the pool.
+   *
+   * @param compressor
+   *          the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    // if the compressor can't be reused, don't pool it.
+    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    compressor.reset();
+    payback(COMPRESSOR_POOL, compressor);
+  }
+
+  /**
+   * Return the {@link Decompressor} to the pool.
+   *
+   * @param decompressor
+   *          the <code>Decompressor</code> to be returned to the pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    // if the decompressor can't be reused, don't pool it.
+    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    decompressor.reset();
+    payback(DECOMPRESSOR_POOL, decompressor);
+  }
+
+  private CodecPool() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
new file mode 100644
index 0000000..bb035a8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.exception;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class AlreadyExistsStorageException extends IOException {
+  private static final long serialVersionUID = 965518916144019032L;
+
+
+  public AlreadyExistsStorageException(String path) {
+    super("Error: "+path+" alreay exists");    
+  }
+  
+  public AlreadyExistsStorageException(Path path) {
+    this(path.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
new file mode 100644
index 0000000..a67d1f7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownCodecException extends Exception {
+
+  private static final long serialVersionUID = 4287230843540404529L;
+
+  public UnknownCodecException() {
+
+  }
+
+  public UnknownCodecException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
new file mode 100644
index 0000000..d18b5a0
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownDataTypeException extends Exception {
+
+  private static final long serialVersionUID = -2630390595968966164L;
+
+  public UnknownDataTypeException() {
+
+  }
+
+  public UnknownDataTypeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
new file mode 100644
index 0000000..8b197d6
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.exception;
+
+public class UnsupportedFileTypeException extends RuntimeException {
+	private static final long serialVersionUID = -8160289695849000342L;
+
+	public UnsupportedFileTypeException() {
+	}
+
+	/**
+	 * @param message
+	 */
+	public UnsupportedFileTypeException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
new file mode 100644
index 0000000..ea8bf9f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
+  @Expose private String tableName; // required
+  @Expose private Path uri; // required
+  @Expose private Long startOffset; // required
+  @Expose private Long length; // required
+
+  private String[] hosts; // Datanode hostnames
+  @Expose private int[] diskIds;
+
+  public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
+    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+    builder.mergeFrom(raw);
+    builder.build();
+    init(builder.build());
+  }
+
+  public FileFragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds)
+      throws IOException {
+    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(),
+        blockLocation.getHosts(), diskIds);
+  }
+
+  // Non splittable
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
+    this.set(tableName, uri, start, length, null, null);
+    this.hosts = hosts;
+  }
+
+  public FileFragment(String fragmentId, Path path, long start, long length) {
+    this.set(fragmentId, path, start, length, null, null);
+  }
+
+  public FileFragment(FileFragmentProto proto) {
+    init(proto);
+  }
+
+  private void init(FileFragmentProto proto) {
+    int[] diskIds = new int[proto.getDiskIdsList().size()];
+    int i = 0;
+    for(Integer eachValue: proto.getDiskIdsList()) {
+      diskIds[i++] = eachValue;
+    }
+    this.set(proto.getId(), new Path(proto.getPath()),
+        proto.getStartOffset(), proto.getLength(),
+        proto.getHostsList().toArray(new String[]{}),
+        diskIds);
+  }
+
+  private void set(String tableName, Path path, long start,
+      long length, String[] hosts, int[] diskIds) {
+    this.tableName = tableName;
+    this.uri = path;
+    this.startOffset = start;
+    this.length = length;
+    this.hosts = hosts;
+    this.diskIds = diskIds;
+  }
+
+
+  /**
+   * Get the list of hosts (hostname) hosting this block
+   */
+  public String[] getHosts() {
+    if (hosts == null) {
+      this.hosts = new String[0];
+    }
+    return hosts;
+  }
+
+  /**
+   * Get the list of Disk Ids
+   * Unknown disk is -1. Others 0 ~ N
+   */
+  public int[] getDiskIds() {
+    if (diskIds == null) {
+      this.diskIds = new int[getHosts().length];
+      Arrays.fill(this.diskIds, -1);
+    }
+    return diskIds;
+  }
+
+  public String getTableName() {
+    return this.tableName;
+  }
+
+  public Path getPath() {
+    return this.uri;
+  }
+
+  public void setPath(Path path) {
+    this.uri = path;
+  }
+
+  public Long getStartKey() {
+    return this.startOffset;
+  }
+
+  public Long getEndKey() {
+    return this.length;
+  }
+
+  /**
+   * 
+   * The offset range of tablets <b>MUST NOT</b> be overlapped.
+   * 
+   * @param t
+   * @return If the table paths are not same, return -1.
+   */
+  @Override
+  public int compareTo(FileFragment t) {
+    if (getPath().equals(t.getPath())) {
+      long diff = this.getStartKey() - t.getStartKey();
+      if (diff < 0) {
+        return -1;
+      } else if (diff > 0) {
+        return 1;
+      } else {
+        return 0;
+      }
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FileFragment) {
+      FileFragment t = (FileFragment) o;
+      if (getPath().equals(t.getPath())
+          && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
+          && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableName, uri, startOffset, length);
+  }
+  
+  public Object clone() throws CloneNotSupportedException {
+    FileFragment frag = (FileFragment) super.clone();
+    frag.tableName = tableName;
+    frag.uri = uri;
+    frag.diskIds = diskIds;
+    frag.hosts = hosts;
+
+    return frag;
+  }
+
+  @Override
+  public String toString() {
+    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+    		+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
+        + getEndKey() + "}" ;
+  }
+
+  public FragmentProto getProto() {
+    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+    builder.setId(this.tableName);
+    builder.setStartOffset(this.startOffset);
+    builder.setLength(this.length);
+    builder.setPath(this.uri.toString());
+    if(diskIds != null) {
+      List<Integer> idList = new ArrayList<Integer>();
+      for(int eachId: diskIds) {
+        idList.add(eachId);
+      }
+      builder.addAllDiskIds(idList);
+    }
+
+    if(hosts != null) {
+      builder.addAllHosts(TUtil.newList(hosts));
+    }
+
+    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+    fragmentBuilder.setId(this.tableName);
+    fragmentBuilder.setContents(builder.buildPartial().toByteString());
+    return fragmentBuilder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
new file mode 100644
index 0000000..3f9c160
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public interface Fragment extends ProtoObject<FragmentProto> {
+
+  public abstract String getTableName();
+
+  @Override
+  public abstract FragmentProto getProto();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
new file mode 100644
index 0000000..3bfe96f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.annotation.ThreadSafe;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+
+@ThreadSafe
+public class FragmentConvertor {
+  /**
+   * Cache of fragment classes
+   */
+  protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
+  /**
+   * Cache of constructors for each class.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  /**
+   * default parameter for all constructors
+   */
+  private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
+
+  public static Class<? extends Fragment> getFragmentClass(Configuration conf, StoreType storeType)
+      throws IOException {
+    String handlerName = storeType.name().toLowerCase();
+    Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(handlerName);
+    if (fragmentClass == null) {
+      fragmentClass = conf.getClass(
+          String.format("tajo.storage.fragment.%s.class", storeType.name().toLowerCase()), null, Fragment.class);
+      CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass);
+    }
+
+    if (fragmentClass == null) {
+      throw new IOException("No such a fragment for " + storeType.name());
+    }
+
+    return fragmentClass;
+  }
+
+  public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{fragment.getContents()});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  public static <T extends Fragment> T convert(Configuration conf, StoreType storeType, FragmentProto fragment)
+      throws IOException {
+    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType);
+    if (fragmentClass == null) {
+      throw new IOException("No such a fragment class for " + storeType.name());
+    }
+    return convert(fragmentClass, fragment);
+  }
+
+  public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
+      throws IOException {
+    List<T> list = Lists.newArrayList();
+    for (FragmentProto proto : fragments) {
+      list.add(convert(clazz, proto));
+    }
+    return list;
+  }
+
+  public static <T extends Fragment> List<T> convert(Configuration conf, StoreType storeType,
+                                                           FragmentProto...fragments) throws IOException {
+    List<T> list = Lists.newArrayList();
+    for (FragmentProto proto : fragments) {
+      list.add((T) convert(conf, storeType, proto));
+    }
+    return list;
+  }
+
+  public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
+    List<FragmentProto> list = Lists.newArrayList();
+    for (Fragment fragment : fragments) {
+      list.add(fragment.getProto());
+    }
+    return list;
+  }
+
+  public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
+    List<FragmentProto> list = toFragmentProtoList(fragments);
+    return list.toArray(new FragmentProto[list.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
new file mode 100644
index 0000000..74be7ff
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.io.IOException;
+
+public interface IndexMethod {
+  IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+      TupleComparator comparator) throws IOException;
+  IndexReader getIndexReader(final Path fileName, Schema keySchema,
+      TupleComparator comparator) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
new file mode 100644
index 0000000..7baf7aa
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface IndexReader {
+  
+  /**
+   * Find the offset corresponding to key which is equal to a given key.
+   * 
+   * @param key
+   * @return
+   * @throws IOException 
+   */
+  public long find(Tuple key) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
new file mode 100644
index 0000000..04738f8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public abstract class IndexWriter {
+  
+  public abstract void write(Tuple key, long offset) throws IOException;
+  
+  public abstract void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
new file mode 100644
index 0000000..688bbc7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface OrderIndexReader extends IndexReader {
+  /**
+   * Find the offset corresponding to key which is equal to or greater than 
+   * a given key.
+   * 
+   * @param key to find
+   * @return
+   * @throws IOException 
+   */
+  public long find(Tuple key, boolean nextKey) throws IOException;
+  
+  /**
+   * Return the next offset from the latest find or next offset
+   * @return
+   * @throws IOException
+   */
+  public long next() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
new file mode 100644
index 0000000..bc8fe96
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index.bst;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.IndexMethod;
+import org.apache.tajo.storage.index.IndexWriter;
+import org.apache.tajo.storage.index.OrderIndexReader;
+import org.apache.tajo.util.Bytes;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * This is two-level binary search tree index. This is one of the value-list 
+ * index structure. Thus, it is inefficient in the case where 
+ * the many of the values are same. Also, the BST shows the fast performance 
+ * when the selectivity of rows to be retrieved is less than 5%.
+ * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
+ */
+public class BSTIndex implements IndexMethod {
+  private static final Log LOG = LogFactory.getLog(BSTIndex.class);
+
+  public static final int ONE_LEVEL_INDEX = 1;
+  public static final int TWO_LEVEL_INDEX = 2;
+
+  private final Configuration conf;
+
+  public BSTIndex(final Configuration conf) {
+    this.conf = conf;
+  }
+  
+  @Override
+  public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+      TupleComparator comparator) throws IOException {
+    return new BSTIndexWriter(fileName, level, keySchema, comparator);
+  }
+
+  @Override
+  public BSTIndexReader getIndexReader(Path fileName, Schema keySchema,
+      TupleComparator comparator) throws IOException {
+    return new BSTIndexReader(fileName, keySchema, comparator);
+  }
+
+  public BSTIndexReader getIndexReader(Path fileName) throws IOException {
+    return new BSTIndexReader(fileName);
+  }
+
+  public class BSTIndexWriter extends IndexWriter implements Closeable {
+    private FSDataOutputStream out;
+    private FileSystem fs;
+    private int level;
+    private int loadNum = 4096;
+    private Path fileName;
+
+    private final Schema keySchema;
+    private final TupleComparator compartor;
+    private final KeyOffsetCollector collector;
+    private KeyOffsetCollector rootCollector;
+
+    private Tuple firstKey;
+    private Tuple lastKey;
+
+
+    // private Tuple lastestKey = null;
+
+    /**
+     * constructor
+     *
+     * @param level
+     *          : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
+     * @throws IOException
+     */
+    public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
+        TupleComparator comparator) throws IOException {
+      this.fileName = fileName;
+      this.level = level;
+      this.keySchema = keySchema;
+      this.compartor = comparator;
+      this.collector = new KeyOffsetCollector(comparator);
+    }
+
+   public void setLoadNum(int loadNum) {
+      this.loadNum = loadNum;
+    }
+
+    public void open() throws IOException {
+      fs = fileName.getFileSystem(conf);
+      if (fs.exists(fileName)) {
+        throw new IOException("ERROR: index file (" + fileName + " already exists");
+      }
+      out = fs.create(fileName);
+    }
+
+    @Override
+    public void write(Tuple key, long offset) throws IOException {
+      if (firstKey == null || compartor.compare(key, firstKey) < 0) {
+        firstKey = key;
+      }
+      if (lastKey == null || compartor.compare(lastKey, key) < 0) {
+        lastKey = key;
+      }
+
+      collector.put(key, offset);
+    }
+
+    public TupleComparator getComparator() {
+      return this.compartor;
+    }
+
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    public void writeHeader(int entryNum) throws IOException {
+      // schema
+      byte [] schemaBytes = keySchema.getProto().toByteArray();
+      out.writeInt(schemaBytes.length);
+      out.write(schemaBytes);
+
+      // comparator
+      byte [] comparatorBytes = compartor.getProto().toByteArray();
+      out.writeInt(comparatorBytes.length);
+      out.write(comparatorBytes);
+
+      // level
+      out.writeInt(this.level);
+      // entry
+      out.writeInt(entryNum);
+      if (entryNum > 0) {
+        byte [] minBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
+            firstKey);
+        out.writeInt(minBytes.length);
+        out.write(minBytes);
+        byte [] maxBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
+            lastKey);
+        out.writeInt(maxBytes.length);
+        out.write(maxBytes);
+      }
+      out.flush();
+    }
+
+    public void close() throws IOException {
+      /* two level initialize */
+      if (this.level == TWO_LEVEL_INDEX) {
+        rootCollector = new KeyOffsetCollector(this.compartor);
+      }
+
+      /* data writing phase */
+      TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
+      Set<Tuple> keySet = keyOffsetMap.keySet();
+
+      int entryNum = keySet.size();
+      writeHeader(entryNum);
+      
+      int loadCount = this.loadNum - 1;
+      for (Tuple key : keySet) {
+
+        if (this.level == TWO_LEVEL_INDEX) {
+          loadCount++;
+          if (loadCount == this.loadNum) {
+            rootCollector.put(key, out.getPos());
+            loadCount = 0;
+          }
+        }
+        /* key writing */
+        byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(this.keySchema, key);
+        out.writeInt(buf.length);
+        out.write(buf);
+        
+        /**/
+        LinkedList<Long> offsetList = keyOffsetMap.get(key);
+        /* offset num writing */
+        int offsetSize = offsetList.size();
+        out.writeInt(offsetSize);
+        /* offset writing */
+        for (Long offset : offsetList) {
+          out.writeLong(offset);
+        }
+      }
+
+      out.flush();
+      out.close();
+      keySet.clear();
+      collector.clear();
+
+      FSDataOutputStream rootOut = null;
+      /* root index creating phase */
+      if (this.level == TWO_LEVEL_INDEX) {
+        TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
+        keySet = rootMap.keySet();
+
+        rootOut = fs.create(new Path(fileName + ".root"));
+        rootOut.writeInt(this.loadNum);
+        rootOut.writeInt(keySet.size());
+
+        /* root key writing */
+        for (Tuple key : keySet) {
+          byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, key);
+          rootOut.writeInt(buf.length);
+          rootOut.write(buf);
+
+          LinkedList<Long> offsetList = rootMap.get(key);
+          if (offsetList.size() > 1 || offsetList.size() == 0) {
+            throw new IOException("Why root index doen't have one offset?");
+          }
+          rootOut.writeLong(offsetList.getFirst());
+
+        }
+        rootOut.flush();
+        rootOut.close();
+
+        keySet.clear();
+        rootCollector.clear();
+      }
+    }
+
+    private class KeyOffsetCollector {
+      private TreeMap<Tuple, LinkedList<Long>> map;
+
+      public KeyOffsetCollector(TupleComparator comparator) {
+        map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
+      }
+
+      public void put(Tuple key, long offset) {
+        if (map.containsKey(key)) {
+          map.get(key).add(offset);
+        } else {
+          LinkedList<Long> list = new LinkedList<Long>();
+          list.add(offset);
+          map.put(key, list);
+        }
+      }
+
+      public TreeMap<Tuple, LinkedList<Long>> getMap() {
+        return this.map;
+      }
+
+      public void clear() {
+        this.map.clear();
+      }
+    }
+  }
+
+  /**
+   * BSTIndexReader is thread-safe.
+   */
+  public class BSTIndexReader implements OrderIndexReader , Closeable{
+    private Path fileName;
+    private Schema keySchema;
+    private TupleComparator comparator;
+
+    private FileSystem fs;
+    private FSDataInputStream indexIn;
+    private FSDataInputStream subIn;
+
+    private int level;
+    private int entryNum;
+    private int loadNum = -1;
+    private Tuple firstKey;
+    private Tuple lastKey;
+
+    // the cursors of BST
+    private int rootCursor;
+    private int keyCursor;
+    private int offsetCursor;
+
+    // mutex
+    private final Object mutex = new Object();
+
+    /**
+     *
+     * @param fileName
+     * @param keySchema
+     * @param comparator
+     * @throws IOException
+     */
+    public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+      this.fileName = fileName;
+      this.keySchema = keySchema;
+      this.comparator = comparator;
+    }
+
+    public BSTIndexReader(final Path fileName) throws IOException {
+      this.fileName = fileName;
+    }
+
+    public Schema getKeySchema() {
+      return this.keySchema;
+    }
+
+    public TupleComparator getComparator() {
+      return this.comparator;
+    }
+
+    private void readHeader() throws IOException {
+      // schema
+      int schemaByteSize = indexIn.readInt();
+      byte [] schemaBytes = new byte[schemaByteSize];
+      Bytes.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+
+      SchemaProto.Builder builder = SchemaProto.newBuilder();
+      builder.mergeFrom(schemaBytes);
+      SchemaProto proto = builder.build();
+      this.keySchema = new Schema(proto);
+
+      // comparator
+      int compByteSize = indexIn.readInt();
+      byte [] compBytes = new byte[compByteSize];
+      Bytes.readFully(indexIn, compBytes, 0, compByteSize);
+
+      TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
+      compProto.mergeFrom(compBytes);
+      this.comparator = new TupleComparator(compProto.build());
+
+      // level
+      this.level = indexIn.readInt();
+      // entry
+      this.entryNum = indexIn.readInt();
+      if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
+        byte [] minBytes = new byte[indexIn.readInt()];
+        Bytes.readFully(indexIn, minBytes, 0, minBytes.length);
+        this.firstKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, minBytes);
+
+        byte [] maxBytes = new byte[indexIn.readInt()];
+        Bytes.readFully(indexIn, maxBytes, 0, maxBytes.length);
+        this.lastKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, maxBytes);
+      }
+    }
+
+    public void open()
+        throws IOException {
+      /* init the index file */
+      fs = fileName.getFileSystem(conf);
+      if (!fs.exists(fileName)) {
+        throw new FileNotFoundException("ERROR: does not exist " + fileName.toString());
+      }
+
+      indexIn = fs.open(this.fileName);
+      readHeader();
+      fillData();
+    }
+
+    private void fillData() throws IOException {
+      /* load on memory */
+      if (this.level == TWO_LEVEL_INDEX) {
+
+        Path rootPath = new Path(this.fileName + ".root");
+        if (!fs.exists(rootPath)) {
+          throw new FileNotFoundException("root index did not created");
+        }
+
+        subIn = indexIn;
+        indexIn = fs.open(rootPath);
+        /* root index header reading : type => loadNum => indexSize */
+        this.loadNum = indexIn.readInt();
+        this.entryNum = indexIn.readInt();
+        /**/
+        fillRootIndex(entryNum, indexIn);
+
+      } else {
+        fillLeafIndex(entryNum, indexIn, -1);
+      }
+    }
+
+    /**
+     *
+     * @return
+     * @throws IOException
+     */
+    public long find(Tuple key) throws IOException {
+      return find(key, false);
+    }
+
+    @Override
+    public long find(Tuple key, boolean nextKey) throws IOException {
+      synchronized (mutex) {
+        int pos = -1;
+        switch (this.level) {
+          case ONE_LEVEL_INDEX:
+            pos = oneLevBS(key);
+            break;
+          case TWO_LEVEL_INDEX:
+            pos = twoLevBS(key, this.loadNum + 1);
+            break;
+        }
+
+        if (nextKey) {
+          if (pos + 1 >= this.offsetSubIndex.length) {
+            return -1;
+          }
+          keyCursor = pos + 1;
+          offsetCursor = 0;
+        } else {
+          if (correctable) {
+            keyCursor = pos;
+            offsetCursor = 0;
+          } else {
+            return -1;
+          }
+        }
+
+        return this.offsetSubIndex[keyCursor][offsetCursor];
+      }
+    }
+
+    public long next() throws IOException {
+      synchronized (mutex) {
+        if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
+          offsetCursor++;
+        } else {
+          if (offsetSubIndex.length - 1 > keyCursor) {
+            keyCursor++;
+            offsetCursor = 0;
+          } else {
+            if (offsetIndex.length -1 > rootCursor) {
+              rootCursor++;
+              fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
+              keyCursor = 1;
+              offsetCursor = 0;
+            } else {
+              return -1;
+            }
+          }
+        }
+
+        return this.offsetSubIndex[keyCursor][offsetCursor];
+      }
+    }
+    
+    public boolean isCurInMemory() {
+      return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
+    }
+
+    private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
+        throws IOException {
+      int counter = 0;
+      try {
+        if (pos != -1) {
+          in.seek(pos);
+        }
+        this.dataSubIndex = new Tuple[entryNum];
+        this.offsetSubIndex = new long[entryNum][];
+
+        byte[] buf;
+
+        for (int i = 0; i < entryNum; i++) {
+          counter++;
+          buf = new byte[in.readInt()];
+          Bytes.readFully(in, buf, 0, buf.length);
+          dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+
+          int offsetNum = in.readInt();
+          this.offsetSubIndex[i] = new long[offsetNum];
+          for (int j = 0; j < offsetNum; j++) {
+            this.offsetSubIndex[i][j] = in.readLong();
+          }
+
+        }
+
+      } catch (IOException e) {
+        counter--;
+        if (pos != -1) {
+          in.seek(pos);
+        }
+        this.dataSubIndex = new Tuple[counter];
+        this.offsetSubIndex = new long[counter][];
+
+        byte[] buf;
+        for (int i = 0; i < counter; i++) {
+          buf = new byte[in.readInt()];
+          Bytes.readFully(in, buf, 0, buf.length);
+          dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+
+          int offsetNum = in.readInt();
+          this.offsetSubIndex[i] = new long[offsetNum];
+          for (int j = 0; j < offsetNum; j++) {
+            this.offsetSubIndex[i][j] = in.readLong();
+          }
+
+        }
+      }
+    }
+
+    public Tuple getFirstKey() {
+      return this.firstKey;
+    }
+
+    public Tuple getLastKey() {
+      return this.lastKey;
+    }
+
+    private void fillRootIndex(int entryNum, FSDataInputStream in)
+        throws IOException {
+      this.dataIndex = new Tuple[entryNum];
+      this.offsetIndex = new long[entryNum];
+      Tuple keyTuple;
+      byte[] buf;
+      for (int i = 0; i < entryNum; i++) {
+        buf = new byte[in.readInt()];
+        Bytes.readFully(in, buf, 0, buf.length);
+        keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+        dataIndex[i] = keyTuple;
+        this.offsetIndex[i] = in.readLong();
+      }
+    }
+
+    /* memory index, only one is used. */
+    private Tuple[] dataIndex = null;
+    private Tuple[] dataSubIndex = null;
+
+    /* offset index */
+    private long[] offsetIndex = null;
+    private long[][] offsetSubIndex = null;
+
+    private boolean correctable = true;
+
+    private int oneLevBS(Tuple key) throws IOException {
+      correctable = true;
+      int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+      return pos;
+    }
+
+    private int twoLevBS(Tuple key, int loadNum) throws IOException {
+      int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
+      if(pos > 0) {
+        rootCursor = pos;
+      } else {
+        rootCursor = 0;
+      }
+      fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
+      pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+       
+      return pos;
+    }
+
+    private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) {
+      int offset = -1;
+      int start = startPos;
+      int end = endPos;
+
+      //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
+      int centerPos = (start + end) >>> 1;
+      while (true) {
+        if (comparator.compare(arr[centerPos], key) > 0) {
+          if (centerPos == 0) {
+            correctable = false;
+            break;
+          } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
+            correctable = false;
+            offset = centerPos - 1;
+            break;
+          } else {
+            end = centerPos;
+            centerPos = (start + end) / 2;
+          }
+        } else if (comparator.compare(arr[centerPos], key) < 0) {
+          if (centerPos == arr.length - 1) {
+            correctable = false;
+            offset = centerPos;
+            break;
+          } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
+            correctable = false;
+            offset = centerPos;
+            break;
+          } else {
+            start = centerPos + 1;
+            centerPos = (start + end) / 2;
+          }
+        } else {
+          correctable = true;
+          offset = centerPos;
+          break;
+        }
+      }
+      return offset;
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.indexIn.close();
+      this.subIn.close();
+    }
+
+    @Override
+    public String toString() {
+      return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
+    }
+  }
+}