You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:25 UTC

[10/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
new file mode 100644
index 0000000..a0ad492
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+
+public class HBaseTextSerializerDeserializer {
+  public static Datum deserialize(Column col, byte[] bytes) throws IOException {
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        datum = bytes == null  || bytes.length == 0 ? NullDatum.get() :
+            DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length));
+        break;
+      case INT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+            DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length));
+        break;
+      case INT8:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+            DatumFactory.createInt8(new String(bytes, 0, bytes.length));
+        break;
+      case FLOAT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+            DatumFactory.createFloat4(new String(bytes, 0, bytes.length));
+        break;
+      case FLOAT8:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+            DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length));
+        break;
+      case TEXT:
+        datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
+        break;
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  public static byte[] serialize(Column col, Datum datum) throws IOException {
+    if (datum == null || datum instanceof NullDatum) {
+      return null;
+    }
+
+    return datum.asChars().getBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
new file mode 100644
index 0000000..07f7988
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+public class HFileAppender extends AbstractHBaseAppender {
+  private static final Log LOG = LogFactory.getLog(HFileAppender.class);
+
+  private RecordWriter<ImmutableBytesWritable, Cell> writer;
+  private TaskAttemptContext writerContext;
+  private Path workingFilePath;
+  private FileOutputCommitter committer;
+
+  public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                       Schema schema, TableMeta meta, Path stagingDir) {
+    super(conf, taskAttemptId, schema, meta, stagingDir);
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+
+    Configuration taskConf = new Configuration();
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
+
+    ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
+    writerContext = new TaskAttemptContextImpl(taskConf,
+        new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
+            taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
+
+    HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
+    try {
+      writer = hFileOutputFormat2.getRecordWriter(writerContext);
+
+      committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
+      workingFilePath = committer.getWorkPath();
+    } catch (InterruptedException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+
+    LOG.info("Created hbase file writer: " + workingFilePath);
+  }
+
+  long totalNumBytes = 0;
+  ImmutableBytesWritable keyWritable = new ImmutableBytesWritable();
+  boolean first = true;
+  TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+
+
+  @Override
+  public void addTuple(Tuple tuple) throws IOException {
+    Datum datum;
+
+    byte[] rowkey = getRowKeyBytes(tuple);
+
+    if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) {
+      try {
+        for (KeyValue kv : kvSet) {
+          writer.write(keyWritable, kv);
+          totalNumBytes += keyWritable.getLength() + kv.getLength();
+        }
+        kvSet.clear();
+        // Statistical section
+        if (enabledStats) {
+          stats.incrementRow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    first = false;
+
+    keyWritable.set(rowkey);
+
+    readKeyValues(tuple, rowkey);
+    if (keyValues != null) {
+      for (KeyValue eachKeyVal: keyValues) {
+        kvSet.add(eachKeyVal);
+      }
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+  }
+
+  @Override
+  public long getEstimatedOutputSize() throws IOException {
+    // StoreTableExec uses this value as rolling file length
+    // Not rolling
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!kvSet.isEmpty()) {
+      try {
+        for (KeyValue kv : kvSet) {
+          writer.write(keyWritable, kv);
+          totalNumBytes += keyWritable.getLength() + keyWritable.getLength();
+        }
+        kvSet.clear();
+        // Statistical section
+        if (enabledStats) {
+          stats.incrementRow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    if (enabledStats) {
+      stats.setNumBytes(totalNumBytes);
+    }
+    if (writer != null) {
+      try {
+        writer.close(writerContext);
+        committer.commitTask(writerContext);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
new file mode 100644
index 0000000..3a58e50
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+public class IndexPredication {
+  private Column column;
+  private int columnId;
+  private Datum startValue;
+  private Datum stopValue;
+
+  public Column getColumn() {
+    return column;
+  }
+
+  public void setColumn(Column column) {
+    this.column = column;
+  }
+
+  public int getColumnId() {
+    return columnId;
+  }
+
+  public void setColumnId(int columnId) {
+    this.columnId = columnId;
+  }
+
+  public Datum getStartValue() {
+    return startValue;
+  }
+
+  public void setStartValue(Datum startValue) {
+    this.startValue = startValue;
+  }
+
+  public Datum getStopValue() {
+    return stopValue;
+  }
+
+  public void setStopValue(Datum stopValue) {
+    this.stopValue = stopValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
new file mode 100644
index 0000000..4577703
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+public class RowKeyMapping {
+  private boolean isBinary;
+  private int keyFieldIndex;
+
+  public boolean isBinary() {
+    return isBinary;
+  }
+
+  public void setBinary(boolean isBinary) {
+    this.isBinary = isBinary;
+  }
+
+  public int getKeyFieldIndex() {
+    return keyFieldIndex;
+  }
+
+  public void setKeyFieldIndex(int keyFieldIndex) {
+    this.keyFieldIndex = keyFieldIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000..668b116
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage.hbase";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message HBaseFragmentProto {
+  required string tableName = 1;
+  required string hbaseTableName = 2;
+  required bytes startRow = 3;
+  required bytes stopRow = 4;
+  required bool last = 5;
+  required int64 length = 6;
+  optional string regionLocation = 7;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
new file mode 100644
index 0000000..68939d6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestColumnMapping {
+  @Test
+  public void testColumnKeyValueMapping() throws Exception {
+    KeyValueSet keyValueSet = new KeyValueSet();
+    keyValueSet.set(HBaseStorageConstants.META_TABLE_KEY, "test");
+    keyValueSet.set(HBaseStorageConstants.META_COLUMNS_KEY, ":key,col2:key:,col2:value:#b,col3:");
+
+    Schema schema = new Schema();
+    schema.addColumn("c1", Type.TEXT);
+    schema.addColumn("c2", Type.TEXT);
+    schema.addColumn("c3", Type.TEXT);
+    schema.addColumn("c4", Type.TEXT);
+
+    TableMeta tableMeta = new TableMeta(StoreType.HBASE, keyValueSet);
+
+    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+
+    List<String> cfNames = columnMapping.getColumnFamilyNames();
+    assertEquals(2, cfNames.size());
+    assertEquals("col2", cfNames.get(0));
+    assertEquals("col3", cfNames.get(1));
+
+    for (int i = 0; i < columnMapping.getIsBinaryColumns().length; i++) {
+      if (i == 2) {
+        assertTrue(columnMapping.getIsBinaryColumns()[i]);
+      } else {
+        assertFalse(columnMapping.getIsBinaryColumns()[i]);
+      }
+    }
+
+    for (int i = 0; i < columnMapping.getIsRowKeyMappings().length; i++) {
+      if (i == 0) {
+        assertTrue(columnMapping.getIsRowKeyMappings()[i]);
+      } else {
+        assertFalse(columnMapping.getIsRowKeyMappings()[i]);
+      }
+    }
+
+    String[] expectedColumnNames = { null, null, null, null};
+    for (int i = 0; i < schema.size(); i++) {
+      String columnName = columnMapping.getMappingColumns()[i][1] == null ? null :
+          new String(columnMapping.getMappingColumns()[i][1]);
+      assertEquals(expectedColumnNames[i], columnName);
+    }
+
+    for (int i = 0; i < schema.size(); i++) {
+      if (i == 1) {
+        assertTrue(columnMapping.getIsColumnKeys()[i]);
+      } else {
+        assertFalse(columnMapping.getIsColumnKeys()[i]);
+      }
+    }
+
+    for (int i = 0; i < schema.size(); i++) {
+      if (i == 2) {
+        assertTrue(columnMapping.getIsColumnValues()[i]);
+      } else {
+        assertFalse(columnMapping.getIsColumnValues()[i]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
new file mode 100644
index 0000000..1fc4065
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.Pair;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestHBaseStorageManager {
+  @Test
+  public void testGetIndexPredications() throws Exception {
+    Column rowkeyColumn = new Column("rk", Type.TEXT);
+    // where rk >= '020' and rk <= '055'
+    ScanNode scanNode = new ScanNode(1);
+    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
+    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
+    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+    scanNode.setQual(evalNodeA);
+
+    HBaseStorageManager storageManager =
+        (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), StoreType.HBASE);
+    List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertNotNull(indexEvals);
+    assertEquals(1, indexEvals.size());
+    Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or rk = '075'
+    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
+    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+    scanNode.setQual(evalNodeB);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("075", indexPredicateValue.getFirst().asChars());
+    assertEquals("075", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("072", indexPredicateValue.getFirst().asChars());
+    assertEquals("078", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
+    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
+    evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
+    EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
+    scanNode.setQual(evalNodeE);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("073", indexPredicateValue.getFirst().asChars());
+    assertEquals("078", indexPredicateValue.getSecond().asChars());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
new file mode 100644
index 0000000..5105ac5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -0,0 +1,380 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-hdfs</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo HDFS Storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <parquet.version>1.5.0</parquet.version>
+    <parquet.format.version>2.1.0</parquet.format.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/testVariousTypes.avsc</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>trevni-core</artifactId>
+      <version>1.7.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>trevni-avro</artifactId>
+      <version>1.7.3</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-api</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-json</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-server-tests</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-hs</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-format</artifactId>
+      <version>${parquet.format.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
new file mode 100644
index 0000000..4bf4c99
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class CSVFile {
+
+  public static final byte LF = '\n';
+  public static int EOF = -1;
+
+  private static final Log LOG = LogFactory.getLog(CSVFile.class);
+
+  public static class CSVAppender extends FileAppender {
+    private final TableMeta meta;
+    private final Schema schema;
+    private final int columnNum;
+    private final FileSystem fs;
+    private FSDataOutputStream fos;
+    private DataOutputStream outputStream;
+    private CompressionOutputStream deflateFilter;
+    private char delimiter;
+    private TableStatistics stats = null;
+    private Compressor compressor;
+    private CompressionCodecFactory codecFactory;
+    private CompressionCodec codec;
+    private Path compressedPath;
+    private byte[] nullChars;
+    private int BUFFER_SIZE = 128 * 1024;
+    private int bufferedBytes = 0;
+    private long pos = 0;
+    private boolean isShuffle;
+
+    private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+    private SerializerDeserializer serde;
+
+    public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+                       final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
+      super(conf, taskAttemptId, schema, meta, workDir);
+      this.fs = workDir.getFileSystem(conf);
+      this.meta = meta;
+      this.schema = schema;
+      this.delimiter = StringEscapeUtils.unescapeJava(
+          this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+
+      this.columnNum = schema.size();
+
+      String nullCharacters = StringEscapeUtils.unescapeJava(
+          this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT));
+
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
+    }
+
+    @Override
+    public void init() throws IOException {
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.getParent().toString());
+      }
+
+      //determine the intermediate file type
+      String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+          TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+      if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+        isShuffle = true;
+      } else {
+        isShuffle = false;
+      }
+
+      if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+        String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+        codecFactory = new CompressionCodecFactory(conf);
+        codec = codecFactory.getCodecByClassName(codecName);
+        compressor =  CodecPool.getCompressor(codec);
+        if(compressor != null) compressor.reset();  //builtin gzip is null
+
+        String extension = codec.getDefaultExtension();
+        compressedPath = path.suffix(extension);
+
+        if (fs.exists(compressedPath)) {
+          throw new AlreadyExistsStorageException(compressedPath);
+        }
+
+        fos = fs.create(compressedPath);
+        deflateFilter = codec.createOutputStream(fos, compressor);
+        outputStream = new DataOutputStream(deflateFilter);
+
+      } else {
+        if (fs.exists(path)) {
+          throw new AlreadyExistsStorageException(path);
+        }
+        fos = fs.create(path);
+        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+      }
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+
+      try {
+        //It will be remove, because we will add custom serde in textfile
+        String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
+            TextSerializerDeserializer.class.getName());
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
+      os.reset();
+      pos = fos.getPos();
+      bufferedBytes = 0;
+      super.init();
+    }
+
+
+    @Override
+    public void addTuple(Tuple tuple) throws IOException {
+      Datum datum;
+      int rowBytes = 0;
+
+      for (int i = 0; i < columnNum; i++) {
+        datum = tuple.get(i);
+        rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
+
+        if(columnNum - 1 > i){
+          os.write((byte) delimiter);
+          rowBytes += 1;
+        }
+        if (isShuffle) {
+          // it is to calculate min/max values, and it is only used for the intermediate file.
+          stats.analyzeField(i, datum);
+        }
+      }
+      os.write(LF);
+      rowBytes += 1;
+
+      pos += rowBytes;
+      bufferedBytes += rowBytes;
+      if(bufferedBytes > BUFFER_SIZE){
+        flushBuffer();
+      }
+      // Statistical section
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    private void flushBuffer() throws IOException {
+      if(os.getLength() > 0) {
+        os.writeTo(outputStream);
+        os.reset();
+        bufferedBytes = 0;
+      }
+    }
+    @Override
+    public long getOffset() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+      outputStream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+      try {
+        flush();
+
+        // Statistical section
+        if (enabledStats) {
+          stats.setNumBytes(getOffset());
+        }
+
+        if(deflateFilter != null) {
+          deflateFilter.finish();
+          deflateFilter.resetState();
+          deflateFilter = null;
+        }
+
+        os.close();
+      } finally {
+        IOUtils.cleanup(LOG, fos);
+        if (compressor != null) {
+          CodecPool.returnCompressor(compressor);
+          compressor = null;
+        }
+      }
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+
+    public boolean isCompress() {
+      return compressor != null;
+    }
+
+    public String getExtension() {
+      return codec != null ? codec.getDefaultExtension() : "";
+    }
+  }
+
+  public static class CSVScanner extends FileScanner implements SeekableScanner {
+    public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
+        throws IOException {
+      super(conf, schema, meta, fragment);
+      factory = new CompressionCodecFactory(conf);
+      codec = factory.getCodec(this.fragment.getPath());
+      if (codec == null || codec instanceof SplittableCompressionCodec) {
+        splittable = true;
+      }
+
+      //Delimiter
+      this.delimiter = StringEscapeUtils.unescapeJava(
+          meta.getOption(StorageConstants.TEXT_DELIMITER,
+          meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0);
+
+      String nullCharacters = StringEscapeUtils.unescapeJava(
+          meta.getOption(StorageConstants.TEXT_NULL,
+          meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT)));
+
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
+    }
+
+    private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
+    private char delimiter;
+    private FileSystem fs;
+    private FSDataInputStream fis;
+    private InputStream is; //decompressd stream
+    private CompressionCodecFactory factory;
+    private CompressionCodec codec;
+    private Decompressor decompressor;
+    private Seekable filePosition;
+    private boolean splittable = false;
+    private long startOffset, end, pos;
+    private int currentIdx = 0, validIdx = 0, recordCount = 0;
+    private int[] targetColumnIndexes;
+    private boolean eof = false;
+    private final byte[] nullChars;
+    private SplitLineReader reader;
+    private ArrayList<Long> fileOffsets;
+    private ArrayList<Integer> rowLengthList;
+    private ArrayList<Integer> startOffsets;
+    private NonSyncByteArrayOutputStream buffer;
+    private SerializerDeserializer serde;
+
+    @Override
+    public void init() throws IOException {
+      fileOffsets = new ArrayList<Long>();
+      rowLengthList = new ArrayList<Integer>();
+      startOffsets = new ArrayList<Integer>();
+      buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+
+      // FileFragment information
+      if(fs == null) {
+        fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
+      }
+      if(fis == null) fis = fs.open(fragment.getPath());
+
+      recordCount = 0;
+      pos = startOffset = fragment.getStartKey();
+      end = startOffset + fragment.getLength();
+
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        if (codec instanceof SplittableCompressionCodec) {
+          SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+              fis, decompressor, startOffset, end,
+              SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+          reader = new CompressedSplitLineReader(cIn, conf, null);
+          startOffset = cIn.getAdjustedStart();
+          end = cIn.getAdjustedEnd();
+          filePosition = cIn;
+          is = cIn;
+        } else {
+          is = new DataInputStream(codec.createInputStream(fis, decompressor));
+          reader = new SplitLineReader(is, null);
+          filePosition = fis;
+        }
+      } else {
+        fis.seek(startOffset);
+        filePosition = fis;
+        is = fis;
+        reader = new SplitLineReader(is, null);
+      }
+
+      if (targets == null) {
+        targets = schema.toArray();
+      }
+
+      targetColumnIndexes = new int[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
+      }
+
+      try {
+        //FIXME
+        String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
+            TextSerializerDeserializer.class.getName());
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
+      super.init();
+      Arrays.sort(targetColumnIndexes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
+            "," + fs.getFileStatus(fragment.getPath()).getLen());
+      }
+
+      if (startOffset != 0) {
+        pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
+      }
+      eof = false;
+      page();
+    }
+
+    private int maxBytesToConsume(long pos) {
+      return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
+    }
+
+    private long fragmentable() throws IOException {
+      return end - getFilePosition();
+    }
+
+    private long getFilePosition() throws IOException {
+      long retVal;
+      if (isCompress()) {
+        retVal = filePosition.getPos();
+      } else {
+        retVal = pos;
+      }
+      return retVal;
+    }
+
+    private void page() throws IOException {
+//      // Index initialization
+      currentIdx = 0;
+      validIdx = 0;
+      int currentBufferPos = 0;
+      int bufferedSize = 0;
+
+      buffer.reset();
+      startOffsets.clear();
+      rowLengthList.clear();
+      fileOffsets.clear();
+
+      if(eof) {
+        return;
+      }
+
+      while (DEFAULT_PAGE_SIZE >= bufferedSize){
+
+        int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+        if(ret == 0){
+          break;
+        } else {
+          fileOffsets.add(pos);
+          pos += ret;
+          startOffsets.add(currentBufferPos);
+          currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
+          bufferedSize += ret;
+          validIdx++;
+          recordCount++;
+        }
+
+        if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
+          eof = true;
+          break;
+        }
+      }
+      if (tableStats != null) {
+        tableStats.setReadBytes(pos - startOffset);
+        tableStats.setNumRows(recordCount);
+      }
+    }
+
+    @Override
+    public float getProgress() {
+      try {
+        if(eof) {
+          return 1.0f;
+        }
+        long filePos = getFilePosition();
+        if (startOffset == filePos) {
+          return 0.0f;
+        } else {
+          long readBytes = filePos - startOffset;
+          long remainingBytes = Math.max(end - filePos, 0);
+          return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      try {
+        if (currentIdx == validIdx) {
+          if (eof) {
+            return null;
+          } else {
+            page();
+
+            if(currentIdx == validIdx){
+              return null;
+            }
+          }
+        }
+
+        long offset = -1;
+        if(!isCompress()){
+          offset = fileOffsets.get(currentIdx);
+        }
+
+        byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+            rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
+        currentIdx++;
+        return new LazyTuple(schema, cells, offset, nullChars, serde);
+      } catch (Throwable t) {
+        LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
+        LOG.error("Tuple list current index: " + currentIdx, t);
+        throw new IOException(t);
+      }
+    }
+
+    private boolean isCompress() {
+      return codec != null;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (tableStats != null) {
+          tableStats.setReadBytes(pos - startOffset);  //Actual Processed Bytes. (decompressed bytes + overhead)
+          tableStats.setNumRows(recordCount);
+        }
+
+        IOUtils.cleanup(LOG, reader, is, fis);
+        fs = null;
+        is = null;
+        fis = null;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("CSVScanner processed record:" + recordCount);
+        }
+      } finally {
+        if (decompressor != null) {
+          CodecPool.returnDecompressor(decompressor);
+          decompressor = null;
+        }
+      }
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return true;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+      if(isCompress()) throw new UnsupportedException();
+
+      int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
+
+      if (tupleIndex > -1) {
+        this.currentIdx = tupleIndex;
+      } else if (isSplittable() && end >= offset || startOffset <= offset) {
+        eof = false;
+        fis.seek(offset);
+        pos = offset;
+        reader.reset();
+        this.currentIdx = 0;
+        this.validIdx = 0;
+        // pageBuffer();
+      } else {
+        throw new IOException("invalid offset " +
+            " < start : " +  startOffset + " , " +
+            "  end : " + end + " , " +
+            "  filePos : " + filePosition.getPos() + " , " +
+            "  input offset : " + offset + " >");
+      }
+    }
+
+    @Override
+    public long getNextOffset() throws IOException {
+      if(isCompress()) throw new UnsupportedException();
+
+      if (this.currentIdx == this.validIdx) {
+        if (fragmentable() <= 0) {
+          return -1;
+        } else {
+          page();
+          if(currentIdx == validIdx) return -1;
+        }
+      }
+      return fileOffsets.get(currentIdx);
+    }
+
+    @Override
+    public boolean isSplittable(){
+      return splittable;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
new file mode 100644
index 0000000..4f58e68
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * Line reader for compressed splits
+ *
+ * Reading records from a compressed split is tricky, as the
+ * LineRecordReader is using the reported compressed input stream
+ * position directly to determine when a split has ended.  In addition the
+ * compressed input stream is usually faking the actual byte position, often
+ * updating it only after the first compressed block after the split is
+ * accessed.
+ *
+ * Depending upon where the last compressed block of the split ends relative
+ * to the record delimiters it can be easy to accidentally drop the last
+ * record or duplicate the last record between this split and the next.
+ *
+ * Split end scenarios:
+ *
+ * 1) Last block of split ends in the middle of a record
+ *      Nothing special that needs to be done here, since the compressed input
+ *      stream will report a position after the split end once the record
+ *      is fully read.  The consumer of the next split will discard the
+ *      partial record at the start of the split normally, and no data is lost
+ *      or duplicated between the splits.
+ *
+ * 2) Last block of split ends in the middle of a delimiter
+ *      The line reader will continue to consume bytes into the next block to
+ *      locate the end of the delimiter.  If a custom delimiter is being used
+ *      then the next record must be read by this split or it will be dropped.
+ *      The consumer of the next split will not recognize the partial
+ *      delimiter at the beginning of its split and will discard it along with
+ *      the next record.
+ *
+ *      However for the default delimiter processing there is a special case
+ *      because CR, LF, and CRLF are all valid record delimiters.  If the
+ *      block ends with a CR then the reader must peek at the next byte to see
+ *      if it is an LF and therefore part of the same record delimiter.
+ *      Peeking at the next byte is an access to the next block and triggers
+ *      the stream to report the end of the split.  There are two cases based
+ *      on the next byte:
+ *
+ *      A) The next byte is LF
+ *           The split needs to end after the current record is returned.  The
+ *           consumer of the next split will discard the first record, which
+ *           is degenerate since LF is itself a delimiter, and start consuming
+ *           records after that byte.  If the current split tries to read
+ *           another record then the record will be duplicated between splits.
+ *
+ *      B) The next byte is not LF
+ *           The current record will be returned but the stream will report
+ *           the split has ended due to the peek into the next block.  If the
+ *           next record is not read then it will be lost, as the consumer of
+ *           the next split will discard it before processing subsequent
+ *           records.  Therefore the next record beyond the reported split end
+ *           must be consumed by this split to avoid data loss.
+ *
+ * 3) Last block of split ends at the beginning of a delimiter
+ *      This is equivalent to case 1, as the reader will consume bytes into
+ *      the next block and trigger the end of the split.  No further records
+ *      should be read as the consumer of the next split will discard the
+ *      (degenerate) record at the beginning of its split.
+ *
+ * 4) Last block of split ends at the end of a delimiter
+ *      Nothing special needs to be done here. The reader will not start
+ *      examining the bytes into the next block until the next record is read,
+ *      so the stream will not report the end of the split just yet.  Once the
+ *      next record is read then the next block will be accessed and the
+ *      stream will indicate the end of the split.  The consumer of the next
+ *      split will correctly discard the first record of its split, and no
+ *      data is lost or duplicated.
+ *
+ *      If the default delimiter is used and the block ends at a CR then this
+ *      is treated as case 2 since the reader does not yet know without
+ *      looking at subsequent bytes whether the delimiter has ended.
+ *
+ * NOTE: It is assumed that compressed input streams *never* return bytes from
+ *       multiple compressed blocks from a single read.  Failure to do so will
+ *       violate the buffering performed by this class, as it will access
+ *       bytes into the next block after the split before returning all of the
+ *       records from the previous block.
+ */
+
+public class CompressedSplitLineReader extends SplitLineReader {
+  SplitCompressionInputStream scin;
+  private boolean usingCRLF;
+  private boolean needAdditionalRecord = false;
+  private boolean finished = false;
+
+  public CompressedSplitLineReader(SplitCompressionInputStream in,
+                                   Configuration conf,
+                                   byte[] recordDelimiterBytes)
+      throws IOException {
+    super(in, conf, recordDelimiterBytes);
+    scin = in;
+    usingCRLF = (recordDelimiterBytes == null);
+  }
+
+  @Override
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    int bytesRead = in.read(buffer);
+
+    // If the split ended in the middle of a record delimiter then we need
+    // to read one additional record, as the consumer of the next split will
+    // not recognize the partial delimiter as a record.
+    // However if using the default delimiter and the next character is a
+    // linefeed then next split will treat it as a delimiter all by itself
+    // and the additional record read should not be performed.
+    if (inDelimiter && bytesRead > 0) {
+      if (usingCRLF) {
+        needAdditionalRecord = (buffer[0] != '\n');
+      } else {
+        needAdditionalRecord = true;
+      }
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    int bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (scin.getPos() > scin.getAdjustedEnd()) {
+        finished = true;
+      }
+
+      bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+      , int maxBytesToConsume) throws IOException {
+    int bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (scin.getPos() > scin.getAdjustedEnd()) {
+        finished = true;
+      }
+
+      bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public boolean needAdditionalRecordAfterSplit() {
+    return !finished && needAdditionalRecord;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
new file mode 100644
index 0000000..47f67c6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+public abstract class FileAppender implements Appender {
+  private static final Log LOG = LogFactory.getLog(FileAppender.class);
+
+  protected boolean inited = false;
+
+  protected final Configuration conf;
+  protected final TableMeta meta;
+  protected final Schema schema;
+  protected final Path workDir;
+  protected final QueryUnitAttemptId taskAttemptId;
+
+  protected boolean enabledStats;
+  protected Path path;
+
+  public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
+                      TableMeta meta, Path workDir) {
+    this.conf = conf;
+    this.meta = meta;
+    this.schema = schema;
+    this.workDir = workDir;
+    this.taskAttemptId = taskAttemptId;
+
+    try {
+      if (taskAttemptId != null) {
+        this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf))
+            .getAppenderFilePath(taskAttemptId, workDir);
+      } else {
+        this.path = workDir;
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e);
+    }
+  }
+
+  public void init() throws IOException {
+    if (inited) {
+     throw new IllegalStateException("FileAppender is already initialized.");
+    }
+    inited = true;
+  }
+
+  public void enableStats() {
+    if (inited) {
+      throw new IllegalStateException("Should enable this option before init()");
+    }
+
+    this.enabledStats = true;
+  }
+
+  public long getEstimatedOutputSize() throws IOException {
+    return getOffset();
+  }
+
+  public abstract long getOffset() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
new file mode 100644
index 0000000..038f0f4
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+public abstract class FileScanner implements Scanner {
+  private static final Log LOG = LogFactory.getLog(FileScanner.class);
+
+  protected boolean inited = false;
+  protected final Configuration conf;
+  protected final TableMeta meta;
+  protected final Schema schema;
+  protected final FileFragment fragment;
+  protected final int columnNum;
+
+  protected Column [] targets;
+
+  protected float progress;
+
+  protected TableStats tableStats;
+
+  public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) {
+    this.conf = conf;
+    this.meta = meta;
+    this.schema = schema;
+    this.fragment = (FileFragment)fragment;
+    this.tableStats = new TableStats();
+    this.columnNum = this.schema.size();
+  }
+
+  public void init() throws IOException {
+    inited = true;
+    progress = 0.0f;
+
+    if (fragment != null) {
+      tableStats.setNumBytes(fragment.getLength());
+      tableStats.setNumBlocks(1);
+    }
+
+    if (schema != null) {
+      for(Column eachColumn: schema.getColumns()) {
+        ColumnStats columnStats = new ColumnStats(eachColumn);
+        tableStats.addColumnStat(columnStats);
+      }
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    if (inited) {
+      throw new IllegalStateException("Should be called before init()");
+    }
+    this.targets = targets;
+  }
+
+  public void setSearchCondition(Object expr) {
+    if (inited) {
+      throw new IllegalStateException("Should be called before init()");
+    }
+  }
+
+  public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
+    String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
+    FileSystem fs;
+    if(tajoUser != null) {
+      try {
+        fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
+      } catch (InterruptedException e) {
+        LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
+        fs = FileSystem.get(path.toUri(), tajoConf);
+      }
+    } else {
+      fs = FileSystem.get(path.toUri(), tajoConf);
+    }
+
+    return fs;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
+}