You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:54 UTC

[17/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
deleted file mode 100644
index 064841f..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-
-import java.io.IOException;
-
-public abstract class FileAppender implements Appender {
-  protected boolean inited = false;
-
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final Path path;
-
-  protected boolean enabledStats;
-  
-  public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.path = path;
-  }
-
-  public void init() throws IOException {
-    if (inited) {
-     throw new IllegalStateException("FileAppender is already initialized.");
-    }
-    inited = true;
-  }
-
-  public void enableStats() {
-    if (inited) {
-      throw new IllegalStateException("Should enable this option before init()");
-    }
-
-    this.enabledStats = true;
-  }
-
-  public abstract long getOffset() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
deleted file mode 100644
index c831822..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public abstract class FileScanner implements Scanner {
-  private static final Log LOG = LogFactory.getLog(FileScanner.class);
-
-  protected boolean inited = false;
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final FileFragment fragment;
-  protected final int columnNum;
-
-  protected Column [] targets;
-  
-  public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.fragment = fragment;
-    this.columnNum = this.schema.getColumnNum();
-  }
-
-  public void init() throws IOException {
-    inited = true;
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-    this.targets = targets;
-  }
-
-  public void setSearchCondition(Object expr) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-  }
-
-  public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
-    String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
-    FileSystem fs;
-    if(tajoUser != null) {
-      try {
-        fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
-      } catch (InterruptedException e) {
-        LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
-        fs = FileSystem.get(path.toUri(), tajoConf);
-      }
-    } else {
-      fs = FileSystem.get(path.toUri(), tajoConf);
-    }
-
-    return fs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
deleted file mode 100644
index f05a316..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.exception.UnsupportedException;
-
-import java.net.InetAddress;
-
-/**
- * An instance of FrameTuple is an immutable tuple.
- * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
- */
-public class FrameTuple implements Tuple, Cloneable {
-  private int size;
-  private int leftSize;
-  
-  private Tuple left;
-  private Tuple right;
-  
-  public FrameTuple() {}
-  
-  public FrameTuple(Tuple left, Tuple right) {
-    set(left, right);
-  }
-  
-  public void set(Tuple left, Tuple right) {
-    this.size = left.size() + right.size();
-    this.left = left;
-    this.leftSize = left.size();
-    this.right = right;
-  }
-
-  @Override
-  public int size() {
-    return size;
-  }
-
-  @Override
-  public boolean contains(int fieldId) {
-    Preconditions.checkArgument(fieldId < size, 
-        "Out of field access: " + fieldId);
-    
-    if (fieldId < leftSize) {
-      return left.contains(fieldId);
-    } else {
-      return right.contains(fieldId - leftSize);
-    }
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
-  }
-
-  @Override
-  public void clear() {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void setOffset(long offset) {
-    throw new UnsupportedException();
-  }
-  
-  @Override
-  public long getOffset() {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(Datum [] values) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    Preconditions.checkArgument(fieldId < size, 
-        "Out of field access: " + fieldId);
-    
-    if (fieldId < leftSize) {
-      return left.get(fieldId);
-    } else {
-      return right.get(fieldId - leftSize);
-    }
-  }
-
-  @Override
-  public BooleanDatum getBoolean(int fieldId) {
-    return (BooleanDatum) get(fieldId);
-  }
-
-  @Override
-  public BitDatum getByte(int fieldId) {
-    return (BitDatum) get(fieldId);
-  }
-
-  @Override
-  public CharDatum getChar(int fieldId) {
-    return (CharDatum) get(fieldId);
-  }
-
-  @Override
-  public BlobDatum getBytes(int fieldId) {
-    return (BlobDatum) get(fieldId);
-  }
-
-  @Override
-  public Int2Datum getShort(int fieldId) {
-    return (Int2Datum) get(fieldId);
-  }
-
-  @Override
-  public Int4Datum getInt(int fieldId) {
-    return (Int4Datum) get(fieldId);
-  }
-
-  @Override
-  public Int8Datum getLong(int fieldId) {
-    return (Int8Datum) get(fieldId);
-  }
-
-  @Override
-  public Float4Datum getFloat(int fieldId) {
-    return (Float4Datum) get(fieldId);
-  }
-
-  @Override
-  public Float8Datum getDouble(int fieldId) {
-    return (Float8Datum) get(fieldId);
-  }
-
-  @Override
-  public Inet4Datum getIPv4(int fieldId) {
-    return (Inet4Datum) get(fieldId);
-  }
-
-  @Override
-  public byte[] getIPv4Bytes(int fieldId) { 
-    return get(fieldId).asByteArray();
-  }
-
-  @Override
-  public InetAddress getIPv6(int fieldId) {
-    throw new UnimplementedException();
-  }
-  
-  @Override
-  public byte[] getIPv6Bytes(int fieldId) {
-    throw new UnimplementedException();
-  }
-
-  @Override
-  public TextDatum getString(int fieldId) {
-    return (TextDatum) get(fieldId);
-  }
-
-  @Override
-  public TextDatum getText(int fieldId) {
-    return (TextDatum) get(fieldId);
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    FrameTuple frameTuple = (FrameTuple) super.clone();
-    frameTuple.set(this.left.clone(), this.right.clone());
-    return frameTuple;
-  }
-
-  @Override
-  public Datum[] getValues(){
-    throw new UnsupportedException();
-  }
-
-  public String toString() {
-    boolean first = true;
-    StringBuilder str = new StringBuilder();
-    str.append("(");
-    for(int i=0; i < size(); i++) {      
-      if(contains(i)) {
-        if(first) {
-          first = false;
-        } else {
-          str.append(", ");
-        }
-        str.append(i)
-        .append("=>")
-        .append(get(i));
-      }
-    }
-    str.append(")");
-    return str.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
deleted file mode 100644
index 4d484df..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.exception.InvalidCastException;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-
-public class LazyTuple implements Tuple, Cloneable {
-  private long offset;
-  private Datum[] values;
-  private byte[][] textBytes;
-  private Schema schema;
-  private byte[] nullBytes;
-  private SerializerDeserializer serializeDeserialize;
-
-  public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
-    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
-  }
-
-  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
-    this.schema = schema;
-    this.textBytes = textBytes;
-    this.values = new Datum[schema.getColumnNum()];
-    this.offset = offset;
-    this.nullBytes = nullBytes;
-    this.serializeDeserialize = serde;
-  }
-
-  public LazyTuple(LazyTuple tuple) {
-    this.values = tuple.getValues();
-    this.offset = tuple.offset;
-    this.schema = tuple.schema;
-    this.textBytes = new byte[size()][];
-    this.nullBytes = tuple.nullBytes;
-    this.serializeDeserialize = tuple.serializeDeserialize;
-  }
-
-  @Override
-  public int size() {
-    return values.length;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return textBytes[fieldid] != null || values[fieldid] != null;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
-  }
-
-  @Override
-  public void clear() {
-    for (int i = 0; i < values.length; i++) {
-      values[i] = null;
-      textBytes[i] = null;
-    }
-  }
-
-  //////////////////////////////////////////////////////
-  // Setter
-  //////////////////////////////////////////////////////
-  @Override
-  public void put(int fieldId, Datum value) {
-    values[fieldId] = value;
-    textBytes[fieldId] = null;
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    for (int i = fieldId, j = 0; j < values.length; i++, j++) {
-      this.values[i] = values[j];
-    }
-    this.textBytes = new byte[values.length][];
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
-      values[i] = tuple.get(j);
-      textBytes[i] = null;
-    }
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    System.arraycopy(values, 0, this.values, 0, size());
-    this.textBytes = new byte[values.length][];
-  }
-
-  //////////////////////////////////////////////////////
-  // Getter
-  //////////////////////////////////////////////////////
-  @Override
-  public Datum get(int fieldId) {
-    if (values[fieldId] != null)
-      return values[fieldId];
-    else if (textBytes.length <= fieldId) {
-      values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
-    } else if (textBytes[fieldId] != null) {
-      try {
-        values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
-            textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
-      } catch (Exception e) {
-        values[fieldId] = NullDatum.get();
-      }
-      textBytes[fieldId] = null;
-    } else {
-      //non-projection
-    }
-    return values[fieldId];
-  }
-
-  @Override
-  public void setOffset(long offset) {
-    this.offset = offset;
-  }
-
-  @Override
-  public long getOffset() {
-    return this.offset;
-  }
-
-  @Override
-  public BooleanDatum getBoolean(int fieldId) {
-    return (BooleanDatum) get(fieldId);
-  }
-
-  @Override
-  public BitDatum getByte(int fieldId) {
-    return (BitDatum) get(fieldId);
-  }
-
-  @Override
-  public CharDatum getChar(int fieldId) {
-    return (CharDatum) get(fieldId);
-  }
-
-  @Override
-  public BlobDatum getBytes(int fieldId) {
-    return (BlobDatum) get(fieldId);
-  }
-
-  @Override
-  public Int2Datum getShort(int fieldId) {
-    return (Int2Datum) get(fieldId);
-  }
-
-  @Override
-  public Int4Datum getInt(int fieldId) {
-    return (Int4Datum) get(fieldId);
-  }
-
-  @Override
-  public Int8Datum getLong(int fieldId) {
-    return (Int8Datum) get(fieldId);
-  }
-
-  @Override
-  public Float4Datum getFloat(int fieldId) {
-    return (Float4Datum) get(fieldId);
-  }
-
-  @Override
-  public Float8Datum getDouble(int fieldId) {
-    return (Float8Datum) get(fieldId);
-  }
-
-  @Override
-  public Inet4Datum getIPv4(int fieldId) {
-    return (Inet4Datum) get(fieldId);
-  }
-
-  @Override
-  public byte[] getIPv4Bytes(int fieldId) {
-    return get(fieldId).asByteArray();
-  }
-
-  @Override
-  public InetAddress getIPv6(int fieldId) {
-    throw new InvalidCastException("IPv6 is unsupported yet");
-  }
-
-  @Override
-  public byte[] getIPv6Bytes(int fieldId) {
-    throw new InvalidCastException("IPv6 is unsupported yet");
-  }
-
-  @Override
-  public TextDatum getString(int fieldId) {
-    return (TextDatum) get(fieldId);
-  }
-
-  @Override
-  public TextDatum getText(int fieldId) {
-    return (TextDatum) get(fieldId);
-  }
-
-  public byte[] getTextBytes(int fieldId) {
-    if(textBytes[fieldId] != null)
-      return textBytes[fieldId];
-    else {
-      return get(fieldId).asTextBytes();
-    }
-  }
-
-  public String toString() {
-    boolean first = true;
-    StringBuilder str = new StringBuilder();
-    str.append("(");
-    Datum d;
-    for (int i = 0; i < values.length; i++) {
-      d = get(i);
-      if (d != null) {
-        if (first) {
-          first = false;
-        } else {
-          str.append(", ");
-        }
-        str.append(i)
-            .append("=>")
-            .append(d);
-      }
-    }
-    str.append(")");
-    return str.toString();
-  }
-
-  @Override
-  public int hashCode() {
-    int hashCode = 37;
-    for (int i = 0; i < values.length; i++) {
-      Datum d = get(i);
-      if (d != null) {
-        hashCode ^= (d.hashCode() * 41);
-      } else {
-        hashCode = hashCode ^ (i + 17);
-      }
-    }
-
-    return hashCode;
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum[] datums = new Datum[values.length];
-    for (int i = 0; i < values.length; i++) {
-      datums[i] = get(i);
-    }
-    return datums;
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    LazyTuple lazyTuple = (LazyTuple) super.clone();
-
-    lazyTuple.values = getValues(); //shallow copy
-    lazyTuple.textBytes = new byte[size()][];
-    return lazyTuple;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof Tuple) {
-      Tuple other = (Tuple) obj;
-      return Arrays.equals(getValues(), other.getValues());
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
deleted file mode 100644
index 66c610a..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * A class that provides a line reader from an input stream.
- * Depending on the constructor used, lines will either be terminated by:
- * <ul>
- * <li>one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF).</li>
- * <li><em>or</em>, a custom byte sequence delimiter</li>
- * </ul>
- * In both cases, EOF also terminates an otherwise unterminated
- * line.
- */
-
-public class LineReader implements Closeable {
-  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-  private int bufferSize = DEFAULT_BUFFER_SIZE;
-  private InputStream in;
-  private byte[] buffer;
-  // the number of bytes of real data in the buffer
-  private int bufferLength = 0;
-  // the current position in the buffer
-  private int bufferPosn = 0;
-
-  private static final byte CR = '\r';
-  private static final byte LF = '\n';
-
-  // The line delimiter
-  private final byte[] recordDelimiterBytes;
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * default buffer-size (64k).
-   *
-   * @param in The input stream
-   * @throws IOException
-   */
-  public LineReader(InputStream in) {
-    this(in, DEFAULT_BUFFER_SIZE);
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * given buffer-size.
-   *
-   * @param in         The input stream
-   * @param bufferSize Size of the read buffer
-   * @throws IOException
-   */
-  public LineReader(InputStream in, int bufferSize) {
-    this.in = in;
-    this.bufferSize = bufferSize;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = null;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * <code>io.file.buffer.size</code> specified in the given
-   * <code>Configuration</code>.
-   *
-   * @param in   input stream
-   * @param conf configuration
-   * @throws IOException
-   */
-  public LineReader(InputStream in, Configuration conf) throws IOException {
-    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * default buffer-size, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   The input stream
-   * @param recordDelimiterBytes The delimiter
-   */
-  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
-    this.in = in;
-    this.bufferSize = DEFAULT_BUFFER_SIZE;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * given buffer-size, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   The input stream
-   * @param bufferSize           Size of the read buffer
-   * @param recordDelimiterBytes The delimiter
-   * @throws IOException
-   */
-  public LineReader(InputStream in, int bufferSize,
-                    byte[] recordDelimiterBytes) {
-    this.in = in;
-    this.bufferSize = bufferSize;
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-  /**
-   * Create a line reader that reads from the given stream using the
-   * <code>io.file.buffer.size</code> specified in the given
-   * <code>Configuration</code>, and using a custom delimiter of array of
-   * bytes.
-   *
-   * @param in                   input stream
-   * @param conf                 configuration
-   * @param recordDelimiterBytes The delimiter
-   * @throws IOException
-   */
-  public LineReader(InputStream in, Configuration conf,
-                    byte[] recordDelimiterBytes) throws IOException {
-    this.in = in;
-    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
-    this.buffer = new byte[this.bufferSize];
-    this.recordDelimiterBytes = recordDelimiterBytes;
-  }
-
-
-  /**
-   * Close the underlying stream.
-   *
-   * @throws IOException
-   */
-  public void close() throws IOException {
-    in.close();
-  }
-
-  public void reset() {
-    bufferLength = 0;
-    bufferPosn = 0;
-
-  }
-
-  /**
-   * Read one line from the InputStream into the given Text.
-   *
-   * @param str               the object to store the given line (without newline)
-   * @param maxLineLength     the maximum number of bytes to store into str;
-   *                          the rest of the line is silently discarded.
-   * @param maxBytesToConsume the maximum number of bytes to consume
-   *                          in this call.  This is only a hint, because if the line cross
-   *                          this threshold, we allow it to happen.  It can overshoot
-   *                          potentially by as much as one buffer length.
-   * @return the number of bytes read including the (longest) newline
-   *         found.
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str, int maxLineLength,
-                      int maxBytesToConsume) throws IOException {
-    if (this.recordDelimiterBytes != null) {
-      return readCustomLine(str, maxLineLength, maxBytesToConsume);
-    } else {
-      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
-    }
-  }
-
-  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
-      throws IOException {
-    return in.read(buffer);
-  }
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    /* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     */
-    str.clear();
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = fillBuffer(in, buffer, prevCharCR);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before newline: " + bytesConsumed);
-    }
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
-      , int maxBytesToConsume)
-      throws IOException {
-    /* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     */
-
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = fillBuffer(in, buffer, prevCharCR);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        str.write(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before newline: " + bytesConsumed);
-    }
-
-    if (bytesConsumed > 0) offsets.add(txtLength);
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read a line terminated by one of CR, LF, or CRLF.
-   */
-
-/*  int validIdx = 0;
-  public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
-                             long pos, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    *//* We're reading data from in, but the head of the stream may be
-     * already buffered in buffer, so we have several cases:
-     * 1. No newline characters are in the buffer, so we need to copy
-     *    everything and read another buffer from the stream.
-     * 2. An unambiguously terminated line is in buffer, so we just
-     *    copy to str.
-     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-     *    in CR.  In this case we copy everything up to CR to str, but
-     *    we also need to see what follows CR: if it's LF, then we
-     *    need consume LF as well, so next call to readLine will read
-     *    from after that.
-     * We use a flag prevCharCR to signal if previous character was CR
-     * and, if it happens to be at the end of the buffer, delay
-     * consuming it until we have a chance to look at the char that
-     * follows.
-     *//*
-    //str.clear();
-    str.reset();
-    offsets.clear();
-    foffsets.clear();
-
-    validIdx = 0;
-    long bufferBytesConsumed = 0;
-
-    int txtLength = 0; //tracks str.getLength(), as an optimization
-    int newlineLength = 0; //length of terminating newline
-    boolean prevCharCR = false; //true of prev char was CR
-    long bytesConsumed = 0;
-    do {
-
-      int startPosn = bufferPosn; //starting from where we left off the last time
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        if (prevCharCR) {
-          ++bytesConsumed; //account for CR from previous read
-        }
-        bufferLength = in.read(buffer);
-        if (bufferLength <= 0) {
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-        if (buffer[bufferPosn] == LF) {
-          newlineLength = (prevCharCR) ? 2 : 1;
-          ++bufferPosn; // at next invocation proceed from following byte
-          break;
-        }
-        if (prevCharCR) { //CR + notLF, we are at notLF
-          newlineLength = 1;
-          break;
-        }
-        prevCharCR = (buffer[bufferPosn] == CR);
-      }
-      int readLength = bufferPosn - startPosn;
-      if (prevCharCR && newlineLength == 0) {
-        --readLength; //CR at the end of the buffer
-      }
-      bytesConsumed += readLength;
-      int appendLength = readLength - newlineLength;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-
-      if (appendLength > 0) {
-        str.write(buffer, startPosn, appendLength);
-        //System.out.println(startPosn + "," + appendLength);
-        //str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-
-      if(newlineLength > 0){
-        validIdx++;
-
-        if (bytesConsumed > (long)Integer.MAX_VALUE) {
-          throw new IOException("Too many bytes before newline: " + bytesConsumed);
-        }
-        offsets.add(txtLength);
-        foffsets.add(pos);
-        pos+= bytesConsumed;
-        bufferBytesConsumed += bytesConsumed;
-
-        txtLength = 0;
-        newlineLength = 0;
-        prevCharCR = false; //true of prev char was CR
-        bytesConsumed = 0;
-      } else {
-        bufferBytesConsumed += bytesConsumed;
-        bytesConsumed = 0;
-      }
-    } while ((bufferBytesConsumed < 256 * 1024));
-
-    return (int)bufferBytesConsumed;
-  }*/
-
-  /**
-   * Read a line terminated by a custom delimiter.
-   */
-  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-   /* We're reading data from inputStream, but the head of the stream may be
-    *  already captured in the previous buffer, so we have several cases:
-    *
-    * 1. The buffer tail does not contain any character sequence which
-    *    matches with the head of delimiter. We count it as a
-    *    ambiguous byte count = 0
-    *
-    * 2. The buffer tail contains a X number of characters,
-    *    that forms a sequence, which matches with the
-    *    head of delimiter. We count ambiguous byte count = X
-    *
-    *    // ***  eg: A segment of input file is as follows
-    *
-    *    " record 1792: I found this bug very interesting and
-    *     I have completely read about it. record 1793: This bug
-    *     can be solved easily record 1794: This ."
-    *
-    *    delimiter = "record";
-    *
-    *    supposing:- String at the end of buffer =
-    *    "I found this bug very interesting and I have completely re"
-    *    There for next buffer = "ad about it. record 179       ...."
-    *
-    *     The matching characters in the input
-    *     buffer tail and delimiter head = "re"
-    *     Therefore, ambiguous byte count = 2 ****   //
-    *
-    *     2.1 If the following bytes are the remaining characters of
-    *         the delimiter, then we have to capture only up to the starting
-    *         position of delimiter. That means, we need not include the
-    *         ambiguous characters in str.
-    *
-    *     2.2 If the following bytes are not the remaining characters of
-    *         the delimiter ( as mentioned in the example ),
-    *         then we have to include the ambiguous characters in str.
-    */
-    str.clear();
-    int txtLength = 0; // tracks str.getLength(), as an optimization
-    long bytesConsumed = 0;
-    int delPosn = 0;
-    int ambiguousByteCount = 0; // To capture the ambiguous characters count
-    do {
-      int startPosn = bufferPosn; // Start from previous end position
-      if (bufferPosn >= bufferLength) {
-        startPosn = bufferPosn = 0;
-        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
-        if (bufferLength <= 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
-          break; // EOF
-        }
-      }
-      for (; bufferPosn < bufferLength; ++bufferPosn) {
-        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
-          delPosn++;
-          if (delPosn >= recordDelimiterBytes.length) {
-            bufferPosn++;
-            break;
-          }
-        } else if (delPosn != 0) {
-          bufferPosn--;
-          delPosn = 0;
-        }
-      }
-      int readLength = bufferPosn - startPosn;
-      bytesConsumed += readLength;
-      int appendLength = readLength - delPosn;
-      if (appendLength > maxLineLength - txtLength) {
-        appendLength = maxLineLength - txtLength;
-      }
-      if (appendLength > 0) {
-        if (ambiguousByteCount > 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
-          //appending the ambiguous characters (refer case 2.2)
-          bytesConsumed += ambiguousByteCount;
-          ambiguousByteCount = 0;
-        }
-        str.append(buffer, startPosn, appendLength);
-        txtLength += appendLength;
-      }
-      if (bufferPosn >= bufferLength) {
-        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
-          ambiguousByteCount = delPosn;
-          bytesConsumed -= ambiguousByteCount; //to be consumed in next
-        }
-      }
-    } while (delPosn < recordDelimiterBytes.length
-        && bytesConsumed < maxBytesToConsume);
-    if (bytesConsumed > (long) Integer.MAX_VALUE) {
-      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
-    }
-    return (int) bytesConsumed;
-  }
-
-  /**
-   * Read from the InputStream into the given Text.
-   *
-   * @param str           the object to store the given line
-   * @param maxLineLength the maximum number of bytes to store into str.
-   * @return the number of bytes read including the newline
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str, int maxLineLength) throws IOException {
-    return readLine(str, maxLineLength, Integer.MAX_VALUE);
-  }
-
-  /**
-   * Read from the InputStream into the given Text.
-   *
-   * @param str the object to store the given line
-   * @return the number of bytes read including the newline
-   * @throws IOException if the underlying stream throws
-   */
-  public int readLine(Text str) throws IOException {
-    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
deleted file mode 100644
index e4439f3..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-public class MergeScanner implements Scanner {
-  private Configuration conf;
-  private TableMeta meta;
-  private Schema schema;
-  private List<FileFragment> fragments;
-  private Iterator<FileFragment> iterator;
-  private FileFragment currentFragment;
-  private Scanner currentScanner;
-  private Tuple tuple;
-  private boolean projectable = false;
-  private boolean selectable = false;
-  private Schema target;
-
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
-      throws IOException {
-    this(conf, schema, meta, rawFragmentList, schema);
-  }
-
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList,
-                      Schema target)
-      throws IOException {
-    this.conf = conf;
-    this.schema = schema;
-    this.meta = meta;
-    this.fragments = Lists.newArrayList();
-    for (Fragment f : rawFragmentList) {
-      fragments.add((FileFragment) f);
-    }
-    Collections.sort(fragments);
-
-    this.target = target;
-    this.reset();
-    if (currentScanner != null) {
-      this.projectable = currentScanner.isProjectable();
-      this.selectable = currentScanner.isSelectable();
-    }
-  }
-
-  @Override
-  public void init() throws IOException {
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    if (currentScanner != null)
-      tuple = currentScanner.next();
-
-    if (tuple != null) {
-      return tuple;
-    } else {
-      if (currentScanner != null) {
-        currentScanner.close();
-      }
-      currentScanner = getNextScanner();
-      if (currentScanner != null) {
-        tuple = currentScanner.next();
-      }
-    }
-    return tuple;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    this.iterator = fragments.iterator();
-    this.currentScanner = getNextScanner();
-  }
-
-  private Scanner getNextScanner() throws IOException {
-    if (iterator.hasNext()) {
-      currentFragment = iterator.next();
-      currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema,
-          currentFragment, target);
-      currentScanner.init();
-      return currentScanner;
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(currentScanner != null) {
-      currentScanner.close();
-    }
-    iterator = null;
-    if(fragments != null) {
-      fragments.clear();
-    }
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return projectable;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    this.target = new Schema(targets);
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return selectable;
-  }
-
-  @Override
-  public void setSearchCondition(Object expr) {
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public boolean isSplittable(){
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
deleted file mode 100644
index 94d13ee..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-
-import java.util.Comparator;
-
-public class NumericPathComparator implements Comparator<Path> {
-
-  @Override
-  public int compare(Path p1, Path p2) {
-    int num1 = Integer.parseInt(p1.getName());
-    int num2 = Integer.parseInt(p2.getName());
-
-    return num1 - num2;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
deleted file mode 100644
index db511dc..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.datum.TimestampDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
-
-public class RawFile {
-  private static final Log LOG = LogFactory.getLog(RawFile.class);
-
-  public static class RawFileScanner extends FileScanner implements SeekableScanner {
-    private FileChannel channel;
-    private DataType[] columnTypes;
-    private Path path;
-
-    private ByteBuffer buffer;
-    private Tuple tuple;
-
-    private int headerSize = 0;
-    private BitArray nullFlags;
-    private static final int RECORD_SIZE = 4;
-    private boolean eof = false;
-    private long fileSize;
-    private FileInputStream fis;
-
-    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
-      super(conf, schema, meta, null);
-      this.path = path;
-      init();
-    }
-
-    @SuppressWarnings("unused")
-    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
-      this(conf, schema, meta, fragment.getPath());
-    }
-
-    public void init() throws IOException {
-      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
-      // TODO - to make it unified one.
-      URI uri = path.toUri();
-      fis = new FileInputStream(new File(uri));
-      channel = fis.getChannel();
-      fileSize = channel.size();
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
-      }
-
-      buffer = ByteBuffer.allocateDirect(128 * 1024);
-
-      columnTypes = new DataType[schema.getColumnNum()];
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        columnTypes[i] = schema.getColumn(i).getDataType();
-      }
-
-      tuple = new VTuple(columnTypes.length);
-
-      // initial read
-      channel.read(buffer);
-      buffer.flip();
-
-      nullFlags = new BitArray(schema.getColumnNum());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
-      super.init();
-    }
-
-    @Override
-    public long getNextOffset() throws IOException {
-      return channel.position() - buffer.remaining();
-    }
-
-    @Override
-    public void seek(long offset) throws IOException {
-      long currentPos = channel.position();
-      if(currentPos < offset &&  offset < currentPos + buffer.limit()){
-        buffer.position((int)(offset - currentPos));
-      } else {
-        buffer.clear();
-        channel.position(offset);
-        channel.read(buffer);
-        buffer.flip();
-        eof = false;
-      }
-    }
-
-    private boolean fillBuffer() throws IOException {
-      buffer.compact();
-      if (channel.read(buffer) == -1) {
-        eof = true;
-        return false;
-      } else {
-        buffer.flip();
-        return true;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      if(eof) return null;
-
-      if (buffer.remaining() < headerSize) {
-        if (!fillBuffer()) {
-          return null;
-        }
-      }
-
-      // backup the buffer state
-      int bufferLimit = buffer.limit();
-      int recordSize = buffer.getInt();
-      int nullFlagSize = buffer.getShort();
-
-      buffer.limit(buffer.position() + nullFlagSize);
-      nullFlags.fromByteBuffer(buffer);
-      // restore the start of record contents
-      buffer.limit(bufferLimit);
-      //buffer.position(recordOffset + headerSize);
-      if (buffer.remaining() < (recordSize - headerSize)) {
-        if (!fillBuffer()) {
-          return null;
-        }
-      }
-
-      for (int i = 0; i < columnTypes.length; i++) {
-        // check if the i'th column is null
-        if (nullFlags.get(i)) {
-          tuple.put(i, DatumFactory.createNullDatum());
-          continue;
-        }
-
-        switch (columnTypes[i].getType()) {
-          case BOOLEAN :
-            tuple.put(i, DatumFactory.createBool(buffer.get()));
-            break;
-
-          case BIT :
-            tuple.put(i, DatumFactory.createBit(buffer.get()));
-            break;
-
-          case CHAR :
-            int realLen = buffer.getInt();
-            byte[] buf = new byte[columnTypes[i].getLength()];
-            buffer.get(buf);
-            byte[] charBuf = Arrays.copyOf(buf, realLen);
-            tuple.put(i, DatumFactory.createChar(charBuf));
-            break;
-
-          case INT2 :
-            tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
-            break;
-
-          case INT4 :
-            tuple.put(i, DatumFactory.createInt4(buffer.getInt()));
-            break;
-
-          case INT8 :
-            tuple.put(i, DatumFactory.createInt8(buffer.getLong()));
-            break;
-
-          case FLOAT4 :
-            tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
-            break;
-
-          case FLOAT8 :
-            tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
-            break;
-
-          case TEXT :
-            // TODO - shoud use CharsetEncoder / CharsetDecoder
-            //byte [] rawBytes = getColumnBytes();
-            int strSize2 = buffer.getInt();
-            byte [] strBytes2 = new byte[strSize2];
-            buffer.get(strBytes2);
-            tuple.put(i, DatumFactory.createText(new String(strBytes2)));
-            break;
-
-          case TIMESTAMP:
-            tuple.put(i, DatumFactory.createTimeStampFromMillis(buffer.getLong()));
-            break;
-
-          case BLOB : {
-            //byte [] rawBytes = getColumnBytes();
-            int byteSize = buffer.getInt();
-            byte [] rawBytes = new byte[byteSize];
-            buffer.get(rawBytes);
-            tuple.put(i, DatumFactory.createBlob(rawBytes));
-            break;
-          }
-
-          case PROTOBUF: {
-            //byte [] rawBytes = getColumnBytes();
-            int byteSize = buffer.getInt();
-            byte [] rawBytes = new byte[byteSize];
-            buffer.get(rawBytes);
-
-            ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
-            Message.Builder builder = factory.newBuilder();
-            builder.mergeFrom(rawBytes);
-            tuple.put(i, factory.createDatum(builder.build()));
-            break;
-          }
-
-          case INET4 :
-            byte [] ipv4Bytes = new byte[4];
-            buffer.get(ipv4Bytes);
-            tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
-            break;
-
-          case NULL_TYPE:
-            tuple.put(i, NullDatum.get());
-            break;
-
-          default:
-        }
-      }
-
-      if(!buffer.hasRemaining() && channel.position() == fileSize){
-        eof = true;
-      }
-      return tuple;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      // clear the buffer
-      buffer.clear();
-      // reload initial buffer
-      channel.position(0);
-      channel.read(buffer);
-      buffer.flip();
-      eof = false;
-    }
-
-    @Override
-    public void close() throws IOException {
-      buffer.clear();
-      channel.close();
-      fis.close();
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return false;
-    }
-  }
-
-  public static class RawFileAppender extends FileAppender {
-    private FileChannel channel;
-    private RandomAccessFile randomAccessFile;
-    private DataType[] columnTypes;
-
-    private ByteBuffer buffer;
-    private BitArray nullFlags;
-    private int headerSize = 0;
-    private static final int RECORD_SIZE = 4;
-    private long pos;
-
-    private TableStatistics stats;
-
-    public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
-      super(conf, schema, meta, path);
-    }
-
-    public void init() throws IOException {
-      // TODO - RawFile only works on Local File System.
-      //Preconditions.checkArgument(FileUtil.isLocalPath(path));
-      File file = new File(path.toUri());
-      randomAccessFile = new RandomAccessFile(file, "rw");
-      channel = randomAccessFile.getChannel();
-      pos = 0;
-
-      columnTypes = new DataType[schema.getColumnNum()];
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        columnTypes[i] = schema.getColumn(i).getDataType();
-      }
-
-      buffer = ByteBuffer.allocateDirect(64 * 1024);
-
-      // comput the number of bytes, representing the null flags
-
-      nullFlags = new BitArray(schema.getColumnNum());
-      headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-
-      super.init();
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    private void flushBuffer() throws IOException {
-      buffer.limit(buffer.position());
-      buffer.flip();
-      channel.write(buffer);
-      buffer.clear();
-    }
-
-    private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
-        throws IOException {
-
-      // if the buffer reaches the limit,
-      // write the bytes from 0 to the previous record.
-      if (buffer.remaining() < sizeToBeWritten) {
-
-        int limit = buffer.position();
-        buffer.limit(recordOffset);
-        buffer.flip();
-        channel.write(buffer);
-        buffer.position(recordOffset);
-        buffer.limit(limit);
-        buffer.compact();
-
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-
-      if (buffer.remaining() < headerSize) {
-        flushBuffer();
-      }
-
-      // skip the row header
-      int recordOffset = buffer.position();
-      buffer.position(recordOffset + headerSize);
-      // reset the null flags
-      nullFlags.clear();
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
-        }
-
-        if (t.isNull(i)) {
-          nullFlags.set(i);
-          continue;
-        }
-
-        // 8 is the maximum bytes size of all types
-        if (flushBufferAndReplace(recordOffset, 8)) {
-          recordOffset = 0;
-        }
-
-        switch(columnTypes[i].getType()) {
-          case NULL_TYPE:
-            nullFlags.set(i);
-            continue;
-
-          case BOOLEAN:
-          case BIT:
-            buffer.put(t.get(i).asByte());
-            break;
-
-          case CHAR :
-            byte[] src = t.getChar(i).asByteArray();
-            byte[] dst = Arrays.copyOf(src, columnTypes[i].getLength());
-            buffer.putInt(src.length);
-            buffer.put(dst);
-            break;
-
-          case INT2 :
-            buffer.putShort(t.get(i).asInt2());
-            break;
-
-          case INT4 :
-            buffer.putInt(t.get(i).asInt4());
-            break;
-
-          case INT8 :
-            buffer.putLong(t.get(i).asInt8());
-            break;
-
-          case FLOAT4 :
-            buffer.putFloat(t.get(i).asFloat4());
-            break;
-
-          case FLOAT8 :
-            buffer.putDouble(t.get(i).asFloat8());
-            break;
-
-          case TEXT:
-            byte [] strBytes2 = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
-              recordOffset = 0;
-            }
-            buffer.putInt(strBytes2.length);
-            buffer.put(strBytes2);
-            break;
-
-          case TIMESTAMP:
-            buffer.putLong(((TimestampDatum)t.get(i)).getMillis());
-            break;
-
-          case BLOB : {
-            byte [] rawBytes = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
-              recordOffset = 0;
-            }
-            buffer.putInt(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case PROTOBUF: {
-            // TODO - to be fixed
-//            byte [] lengthByte = new byte[4];
-//            byte [] byteArray = t.get(i).asByteArray();
-//            CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
-//            outputStream.writeUInt32NoTag(byteArray.length);
-//            outputStream.flush();
-//            int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
-//            if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
-//              recordOffset = 0;
-//            }
-//            buffer.put(lengthByte, 0, legnthByteLength);
-            byte [] rawBytes = t.get(i).asByteArray();
-            if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
-              recordOffset = 0;
-            }
-            buffer.putInt(rawBytes.length);
-            buffer.put(rawBytes);
-            break;
-          }
-
-          case INET4 :
-            buffer.put(t.get(i).asByteArray());
-            break;
-
-          default:
-            throw new IOException("Cannot support data type: " + columnTypes[i].getType());
-        }
-      }
-
-      // write a record header
-      int bufferPos = buffer.position();
-      buffer.position(recordOffset);
-      buffer.putInt(bufferPos - recordOffset);
-      byte [] flags = nullFlags.toArray();
-      buffer.putShort((short) flags.length);
-      buffer.put(flags);
-
-      pos += bufferPos - recordOffset;
-      buffer.position(bufferPos);
-
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public void flush() throws IOException {
-      flushBuffer();
-    }
-
-    @Override
-    public void close() throws IOException {
-      flush();
-      if (enabledStats) {
-        stats.setNumBytes(getOffset());
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
-      }
-      channel.close();
-      randomAccessFile.close();
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index 1e89f31..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-import org.apache.tajo.util.Bytes;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
-  public static final Log LOG = LogFactory.getLog(RowFile.class);
-
-  private static final int SYNC_ESCAPE = -1;
-  private static final int SYNC_HASH_SIZE = 16;
-  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
-  private final static int DEFAULT_BUFFER_SIZE = 65535;
-  public static int SYNC_INTERVAL;
-
-  public static class RowFileScanner extends FileScanner {
-    private FileSystem fs;
-    private FSDataInputStream in;
-    private Tuple tuple;
-
-    private byte[] sync = new byte[SYNC_HASH_SIZE];
-    private byte[] checkSync = new byte[SYNC_HASH_SIZE];
-    private long start, end;
-
-    private ByteBuffer buffer;
-    private final int tupleHeaderSize;
-    private BitArray nullFlags;
-    private long bufferStartPos;
-
-    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-
-      SYNC_INTERVAL =
-          conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname,
-              SYNC_SIZE * 100);
-
-      nullFlags = new BitArray(schema.getColumnNum());
-      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
-      this.start = fragment.getStartKey();
-      this.end = this.start + fragment.getEndKey();
-    }
-
-    public void init() throws IOException {
-      // set default page size.
-      fs = fragment.getPath().getFileSystem(conf);
-      in = fs.open(fragment.getPath());
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.getColumnNum());
-      buffer.flip();
-
-      readHeader();
-
-      // find the correct position from the start
-      if (this.start > in.getPos()) {
-        long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
-        in.seek(realStart);
-      }
-      bufferStartPos = in.getPos();
-      fillBuffer();
-
-      if (start != 0) {
-        // TODO: improve
-        boolean syncFound = false;
-        while (!syncFound) {
-          if (buffer.remaining() < SYNC_SIZE) {
-            fillBuffer();
-          }
-          buffer.mark();
-          syncFound = checkSync();
-          if (!syncFound) {
-            buffer.reset();
-            buffer.get(); // proceed one byte
-          }
-        }
-        bufferStartPos += buffer.position();
-        buffer.compact();
-        buffer.flip();
-      }
-
-      super.init();
-    }
-
-    private void readHeader() throws IOException {
-      SYNC_INTERVAL = in.readInt();
-      Bytes.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
-    }
-
-    /**
-     * Find the sync from the front of the buffer
-     *
-     * @return return true if it succeeds to find the sync.
-     * @throws IOException
-     */
-    private boolean checkSync() throws IOException {
-      buffer.getInt();                           // escape
-      buffer.get(checkSync, 0, SYNC_HASH_SIZE);  // sync
-      return Arrays.equals(checkSync, sync);
-    }
-
-    private int fillBuffer() throws IOException {
-      bufferStartPos += buffer.position();
-      buffer.compact();
-      int remain = buffer.remaining();
-      int read = in.read(buffer);
-      if (read == -1) {
-        buffer.flip();
-        return read;
-      } else {
-        int totalRead = read;
-        if (remain > totalRead) {
-          read = in.read(buffer);
-          totalRead += read > 0 ? read : 0;
-        }
-        buffer.flip();
-        return totalRead;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      while (buffer.remaining() < SYNC_SIZE) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      buffer.mark();
-      if (!checkSync()) {
-        buffer.reset();
-      } else {
-        if (bufferStartPos + buffer.position() > end) {
-          return null;
-        }
-      }
-
-      while (buffer.remaining() < tupleHeaderSize) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      int i;
-      tuple = new VTuple(schema.getColumnNum());
-
-      int nullFlagSize = buffer.getShort();
-      byte[] nullFlagBytes = new byte[nullFlagSize];
-      buffer.get(nullFlagBytes, 0, nullFlagSize);
-      nullFlags = new BitArray(nullFlagBytes);
-      int tupleSize = buffer.getShort();
-
-      while (buffer.remaining() < (tupleSize)) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      Datum datum;
-      Column col;
-      for (i = 0; i < schema.getColumnNum(); i++) {
-        if (!nullFlags.get(i)) {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN :
-              datum = DatumFactory.createBool(buffer.get());
-              tuple.put(i, datum);
-              break;
-
-            case BIT:
-              datum = DatumFactory.createBit(buffer.get());
-              tuple.put(i, datum );
-              break;
-
-            case CHAR :
-              int realLen = buffer.getInt();
-              byte[] buf = new byte[col.getDataType().getLength()];
-              buffer.get(buf);
-              byte[] charBuf = Arrays.copyOf(buf, realLen);
-              tuple.put(i, DatumFactory.createChar(charBuf));
-              break;
-
-            case INT2 :
-              datum = DatumFactory.createInt2(buffer.getShort());
-              tuple.put(i, datum );
-              break;
-
-            case INT4 :
-              datum = DatumFactory.createInt4(buffer.getInt());
-              tuple.put(i, datum );
-              break;
-
-            case INT8 :
-              datum = DatumFactory.createInt8(buffer.getLong());
-              tuple.put(i, datum );
-              break;
-
-            case FLOAT4 :
-              datum = DatumFactory.createFloat4(buffer.getFloat());
-              tuple.put(i, datum);
-              break;
-
-            case FLOAT8 :
-              datum = DatumFactory.createFloat8(buffer.getDouble());
-              tuple.put(i, datum);
-              break;
-
-//            case TEXT :
-//              short len = buffer.getShort();
-//              byte[] buf = new byte[len];
-//              buffer.get(buf, 0, len);
-//              datum = DatumFactory.createText(buf);
-//              tuple.put(i, datum);
-//              break;
-
-            case TEXT:
-              short bytelen = buffer.getShort();
-              byte[] strbytes = new byte[bytelen];
-              buffer.get(strbytes, 0, bytelen);
-              datum = DatumFactory.createText(strbytes);
-              tuple.put(i, datum);
-              break;
-
-            case BLOB:
-              short bytesLen = buffer.getShort();
-              byte [] bytesBuf = new byte[bytesLen];
-              buffer.get(bytesBuf);
-              datum = DatumFactory.createBlob(bytesBuf);
-              tuple.put(i, datum);
-              break;
-
-            case INET4 :
-              byte[] ipv4 = new byte[4];
-              buffer.get(ipv4, 0, 4);
-              datum = DatumFactory.createInet4(ipv4);
-              tuple.put(i, datum);
-              break;
-
-            default:
-              break;
-          }
-        } else {
-          tuple.put(i, DatumFactory.createNullDatum());
-        }
-      }
-      return tuple;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (in != null) {
-        in.close();
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return true;
-    }
-  }
-
-  public static class RowFileAppender extends FileAppender {
-    private FSDataOutputStream out;
-    private long lastSyncPos;
-    private FileSystem fs;
-    private byte[] sync;
-    private ByteBuffer buffer;
-
-    private BitArray nullFlags;
-    // statistics
-    private TableStatistics stats;
-
-    public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
-        throws IOException {
-      super(conf, schema, meta, path);
-    }
-
-    public void init() throws IOException {
-      SYNC_INTERVAL = conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
-
-      fs = path.getFileSystem(conf);
-
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      if (fs.exists(path)) {
-        throw new AlreadyExistsStorageException(path);
-      }
-
-      sync = new byte[SYNC_HASH_SIZE];
-      lastSyncPos = 0;
-
-      out = fs.create(path);
-
-      MessageDigest md;
-      try {
-        md = MessageDigest.getInstance("MD5");
-        md.update((path.toString()+System.currentTimeMillis()).getBytes());
-        sync = md.digest();
-      } catch (NoSuchAlgorithmException e) {
-        LOG.error(e);
-      }
-
-      writeHeader();
-
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
-      nullFlags = new BitArray(schema.getColumnNum());
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-    }
-
-    private void writeHeader() throws IOException {
-      out.writeInt(SYNC_INTERVAL);
-      out.write(sync);
-      out.flush();
-      lastSyncPos = out.getPos();
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-      checkAndWriteSync();
-      Column col;
-
-      buffer.clear();
-      nullFlags.clear();
-
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
-        }
-
-        if (t.isNull(i)) {
-          nullFlags.set(i);
-        } else {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN:
-              buffer.put(t.getBoolean(i).asByte());
-              break;
-            case BIT:
-              buffer.put(t.getByte(i).asByte());
-              break;
-            case CHAR:
-              byte[] src = t.getChar(i).asByteArray();
-              byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
-              buffer.putInt(src.length);
-              buffer.put(dst);
-              break;
-            case TEXT:
-              byte [] strbytes = t.getText(i).asByteArray();
-              buffer.putShort((short)strbytes.length);
-              buffer.put(strbytes, 0, strbytes.length);
-              break;
-            case INT2:
-              buffer.putShort(t.getShort(i).asInt2());
-              break;
-            case INT4:
-              buffer.putInt(t.getInt(i).asInt4());
-              break;
-            case INT8:
-              buffer.putLong(t.getLong(i).asInt8());
-              break;
-            case FLOAT4:
-              buffer.putFloat(t.getFloat(i).asFloat4());
-              break;
-            case FLOAT8:
-              buffer.putDouble(t.getDouble(i).asFloat8());
-              break;
-            case BLOB:
-              byte [] bytes = t.getBytes(i).asByteArray();
-              buffer.putShort((short)bytes.length);
-              buffer.put(bytes);
-              break;
-            case INET4:
-              buffer.put(t.getIPv4Bytes(i));
-              break;
-            case INET6:
-              buffer.put(t.getIPv6Bytes(i));
-              break;
-            case NULL_TYPE:
-              nullFlags.set(i);
-              break;
-            default:
-              break;
-          }
-        }
-      }
-
-      byte[] bytes = nullFlags.toArray();
-      out.writeShort(bytes.length);
-      out.write(bytes);
-
-      bytes = buffer.array();
-      int dataLen = buffer.position();
-      out.writeShort(dataLen);
-      out.write(bytes, 0, dataLen);
-
-      // Statistical section
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return out.getPos();
-    }
-
-    @Override
-    public void flush() throws IOException {
-      out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (out != null) {
-        if (enabledStats) {
-          stats.setNumBytes(out.getPos());
-        }
-        sync();
-        out.flush();
-        out.close();
-      }
-    }
-
-    private void sync() throws IOException {
-      if (lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE);
-        out.write(sync);
-        lastSyncPos = out.getPos();
-      }
-    }
-
-    private void checkAndWriteSync() throws IOException {
-      if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
-        sync();
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
deleted file mode 100644
index 9f32028..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.TimestampDatum;
-import org.apache.tajo.util.Bytes;
-
-import java.nio.ByteBuffer;
-
-public class RowStoreUtil {
-  public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
-    int[] targetIds = new int[outSchema.getColumnNum()];
-    int i = 0;
-    for (Column target : outSchema.getColumns()) {
-      targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
-      i++;
-    }
-
-    return targetIds;
-  }
-
-  public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
-    out.clear();
-    for (int idx = 0; idx < targetIds.length; idx++) {
-      out.put(idx, in.get(targetIds[idx]));
-    }
-    return out;
-  }
-
-  public static class RowStoreDecoder {
-
-    public static Tuple toTuple(Schema schema, byte [] bytes) {
-      ByteBuffer bb = ByteBuffer.wrap(bytes);
-      Tuple tuple = new VTuple(schema.getColumnNum());
-      Column col;
-      TajoDataTypes.DataType type;
-      for (int i =0; i < schema.getColumnNum(); i++) {
-        col = schema.getColumn(i);
-        type = col.getDataType();
-        switch (type.getType()) {
-          case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
-          case BIT:
-            byte b = bb.get();
-            if(b == 0) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createBit(b));
-            }
-            break;
-
-          case CHAR:
-            byte c = bb.get();
-            if(c == 0) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createChar(c));
-            }
-            break;
-
-          case INT2:
-            short s = bb.getShort();
-            if(s < Short.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            }else {
-              tuple.put(i, DatumFactory.createInt2(s));
-            }
-            break;
-
-          case INT4:
-          case DATE:
-            int i_ = bb.getInt();
-            if ( i_ < Integer.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createFromInt4(type, i_));
-            }
-            break;
-
-          case INT8:
-          case TIME:
-          case TIMESTAMP:
-            long l = bb.getLong();
-            if ( l < Long.MIN_VALUE + 1) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            }else {
-              tuple.put(i, DatumFactory.createFromInt8(type, l));
-            }
-            break;
-
-          case FLOAT4:
-            float f = bb.getFloat();
-            if (Float.isNaN(f)) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            }else {
-              tuple.put(i, DatumFactory.createFloat4(f));
-            }
-            break;
-
-          case FLOAT8:
-            double d = bb.getDouble();
-            if(Double.isNaN(d)) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            }else {
-              tuple.put(i, DatumFactory.createFloat8(d));
-            }
-            break;
-
-          case TEXT:
-            byte [] _string = new byte[bb.getInt()];
-            bb.get(_string);
-            String str = new String(_string);
-            if(str.compareTo("NULL") == 0) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            }else {
-            tuple.put(i, DatumFactory.createText(str));
-            }
-            break;
-
-          case BLOB:
-            byte [] _bytes = new byte[bb.getInt()];
-            bb.get(_bytes);
-            if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) {
-              tuple.put(i, DatumFactory.createNullDatum());
-            } else {
-              tuple.put(i, DatumFactory.createBlob(_bytes));
-            }
-            break;
-
-          case INET4:
-            byte [] _ipv4 = new byte[4];
-            bb.get(_ipv4);
-            tuple.put(i, DatumFactory.createInet4(_ipv4));
-            break;
-          case INET6:
-            // TODO - to be implemented
-        }
-      }
-      return tuple;
-    }
-  }
-
-  public static class RowStoreEncoder {
-
-    public static byte [] toBytes(Schema schema, Tuple tuple) {
-      int size = StorageUtil.getRowByteSize(schema);
-      ByteBuffer bb = ByteBuffer.allocate(size);
-      Column col;
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        col = schema.getColumn(i);
-        switch (col.getDataType().getType()) {
-          case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
-          case BIT: bb.put(tuple.get(i).asByte()); break;
-          case CHAR: bb.put(tuple.get(i).asByte()); break;
-          case INT2: bb.putShort(tuple.get(i).asInt2()); break;
-          case INT4: bb.putInt(tuple.get(i).asInt4()); break;
-          case INT8: bb.putLong(tuple.get(i).asInt8()); break;
-          case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
-          case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
-          case TEXT:
-            byte [] _string = tuple.get(i).asByteArray();
-            bb.putInt(_string.length);
-            bb.put(_string);
-            break;
-          case DATE: bb.putInt(tuple.get(i).asInt4()); break;
-          case TIMESTAMP: bb.putLong(((TimestampDatum)tuple.get(i)).getMillis()); break;
-          case BLOB:
-            byte [] bytes = tuple.get(i).asByteArray();
-            bb.putInt(bytes.length);
-            bb.put(bytes);
-            break;
-          case INET4:
-            byte [] ipBytes = tuple.getIPv4Bytes(i);
-            bb.put(ipBytes);
-            break;
-          case INET6: bb.put(tuple.getIPv6Bytes(i)); break;
-          default:
-        }
-      }
-
-      bb.flip();
-      byte [] buf = new byte [bb.limit()];
-      bb.get(buf);
-      return buf;
-    }
-  }
-}