You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:55 UTC

[18/18] git commit: TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bbf9b7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bbf9b7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bbf9b7bf

Branch: refs/heads/master
Commit: bbf9b7bf8b60a930b4e44754c732f9629762d1bf
Parents: 6aa96fa
Author: jinossy <ji...@gmail.com>
Authored: Tue Jan 28 21:34:41 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Tue Jan 28 21:34:41 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |    2 +
 pom.xml                                         |    1 +
 tajo-client/pom.xml                             |    2 +-
 tajo-core/pom.xml                               |    2 -
 tajo-core/tajo-core-backend/pom.xml             |    2 +-
 tajo-core/tajo-core-pullserver/pom.xml          |    2 +-
 tajo-core/tajo-core-storage/pom.xml             |  301 ---
 .../tajo/storage/AbstractStorageManager.java    |  690 -------
 .../java/org/apache/tajo/storage/Appender.java  |   39 -
 .../storage/BinarySerializerDeserializer.java   |  257 ---
 .../java/org/apache/tajo/storage/CSVFile.java   |  531 -----
 .../tajo/storage/CompressedSplitLineReader.java |  182 --
 .../org/apache/tajo/storage/DataLocation.java   |   45 -
 .../org/apache/tajo/storage/FileAppender.java   |   61 -
 .../org/apache/tajo/storage/FileScanner.java    |   93 -
 .../org/apache/tajo/storage/FrameTuple.java     |  231 ---
 .../java/org/apache/tajo/storage/LazyTuple.java |  291 ---
 .../org/apache/tajo/storage/LineReader.java     |  559 ------
 .../org/apache/tajo/storage/MergeScanner.java   |  154 --
 .../tajo/storage/NumericPathComparator.java     |   34 -
 .../java/org/apache/tajo/storage/RawFile.java   |  532 -----
 .../java/org/apache/tajo/storage/RowFile.java   |  506 -----
 .../org/apache/tajo/storage/RowStoreUtil.java   |  206 --
 .../java/org/apache/tajo/storage/Scanner.java   |   94 -
 .../apache/tajo/storage/SeekableScanner.java    |   28 -
 .../tajo/storage/SerializerDeserializer.java    |   34 -
 .../apache/tajo/storage/SplitLineReader.java    |   39 -
 .../java/org/apache/tajo/storage/Storage.java   |   45 -
 .../org/apache/tajo/storage/StorageManager.java |   67 -
 .../tajo/storage/StorageManagerFactory.java     |   98 -
 .../org/apache/tajo/storage/StorageUtil.java    |   83 -
 .../apache/tajo/storage/TableStatistics.java    |  117 --
 .../storage/TextSerializerDeserializer.java     |  209 --
 .../java/org/apache/tajo/storage/Tuple.java     |   82 -
 .../apache/tajo/storage/TupleComparator.java    |  159 --
 .../org/apache/tajo/storage/TupleRange.java     |  103 -
 .../java/org/apache/tajo/storage/VTuple.java    |  226 ---
 .../storage/annotation/ForSplitableStore.java   |   29 -
 .../apache/tajo/storage/compress/CodecPool.java |  185 --
 .../AlreadyExistsStorageException.java          |   39 -
 .../exception/UnknownCodecException.java        |   32 -
 .../exception/UnknownDataTypeException.java     |   32 -
 .../exception/UnsupportedFileTypeException.java |   36 -
 .../tajo/storage/fragment/FileFragment.java     |  219 ---
 .../apache/tajo/storage/fragment/Fragment.java  |   31 -
 .../storage/fragment/FragmentConvertor.java     |  123 --
 .../apache/tajo/storage/index/IndexMethod.java  |   32 -
 .../apache/tajo/storage/index/IndexReader.java  |   35 -
 .../apache/tajo/storage/index/IndexWriter.java  |   33 -
 .../tajo/storage/index/OrderIndexReader.java    |   45 -
 .../apache/tajo/storage/index/bst/BSTIndex.java |  623 ------
 .../storage/rcfile/BytesRefArrayWritable.java   |  261 ---
 .../tajo/storage/rcfile/BytesRefWritable.java   |  248 ---
 .../storage/rcfile/ColumnProjectionUtils.java   |  117 --
 .../rcfile/LazyDecompressionCallback.java       |   32 -
 .../rcfile/NonSyncByteArrayInputStream.java     |  113 --
 .../rcfile/NonSyncByteArrayOutputStream.java    |  144 --
 .../storage/rcfile/NonSyncDataInputBuffer.java  |  507 -----
 .../storage/rcfile/NonSyncDataOutputBuffer.java |   91 -
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 1739 -----------------
 .../SchemaAwareCompressionInputStream.java      |   43 -
 .../SchemaAwareCompressionOutputStream.java     |   44 -
 .../tajo/storage/trevni/TrevniAppender.java     |  201 --
 .../tajo/storage/trevni/TrevniScanner.java      |  193 --
 .../apache/tajo/storage/v2/CSVFileScanner.java  |  386 ----
 .../apache/tajo/storage/v2/DiskDeviceInfo.java  |   62 -
 .../tajo/storage/v2/DiskFileScanScheduler.java  |  205 --
 .../org/apache/tajo/storage/v2/DiskInfo.java    |   75 -
 .../apache/tajo/storage/v2/DiskMountInfo.java   |  101 -
 .../org/apache/tajo/storage/v2/DiskUtil.java    |  199 --
 .../apache/tajo/storage/v2/FileScanRunner.java  |   70 -
 .../apache/tajo/storage/v2/FileScannerV2.java   |  203 --
 .../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ------------------
 .../apache/tajo/storage/v2/RCFileScanner.java   |  297 ---
 .../apache/tajo/storage/v2/ScanScheduler.java   |  189 --
 .../tajo/storage/v2/ScheduledInputStream.java   |  513 -----
 .../tajo/storage/v2/StorageManagerV2.java       |  140 --
 .../src/main/proto/IndexProtos.proto            |   29 -
 .../src/main/resources/storage-default.xml      |  149 --
 .../tajo/storage/TestCompressionStorages.java   |  233 ---
 .../org/apache/tajo/storage/TestFrameTuple.java |   84 -
 .../org/apache/tajo/storage/TestLazyTuple.java  |  258 ---
 .../apache/tajo/storage/TestMergeScanner.java   |  179 --
 .../apache/tajo/storage/TestStorageManager.java |   93 -
 .../org/apache/tajo/storage/TestStorages.java   |  375 ----
 .../tajo/storage/TestTupleComparator.java       |   77 -
 .../org/apache/tajo/storage/TestVTuple.java     |  160 --
 .../apache/tajo/storage/index/TestBSTIndex.java |  948 ---------
 .../index/TestSingleCSVFileBSTIndex.java        |  248 ---
 .../tajo/storage/v2/TestCSVCompression.java     |  213 --
 .../apache/tajo/storage/v2/TestCSVScanner.java  |  168 --
 .../apache/tajo/storage/v2/TestStorages.java    |  242 ---
 .../src/test/resources/storage-default.xml      |  149 --
 tajo-dist/pom.xml                               |    6 +
 tajo-jdbc/pom.xml                               |    2 +-
 tajo-project/pom.xml                            |    2 +-
 tajo-storage/pom.xml                            |  383 ++++
 .../tajo/storage/AbstractStorageManager.java    |  690 +++++++
 .../java/org/apache/tajo/storage/Appender.java  |   39 +
 .../storage/BinarySerializerDeserializer.java   |  257 +++
 .../java/org/apache/tajo/storage/CSVFile.java   |  531 +++++
 .../tajo/storage/CompressedSplitLineReader.java |  182 ++
 .../org/apache/tajo/storage/DataLocation.java   |   45 +
 .../org/apache/tajo/storage/FileAppender.java   |   61 +
 .../org/apache/tajo/storage/FileScanner.java    |   93 +
 .../org/apache/tajo/storage/FrameTuple.java     |  231 +++
 .../java/org/apache/tajo/storage/LazyTuple.java |  291 +++
 .../org/apache/tajo/storage/LineReader.java     |  559 ++++++
 .../org/apache/tajo/storage/MergeScanner.java   |  154 ++
 .../tajo/storage/NumericPathComparator.java     |   34 +
 .../java/org/apache/tajo/storage/RawFile.java   |  532 +++++
 .../java/org/apache/tajo/storage/RowFile.java   |  506 +++++
 .../org/apache/tajo/storage/RowStoreUtil.java   |  206 ++
 .../java/org/apache/tajo/storage/Scanner.java   |   94 +
 .../apache/tajo/storage/SeekableScanner.java    |   28 +
 .../tajo/storage/SerializerDeserializer.java    |   34 +
 .../apache/tajo/storage/SplitLineReader.java    |   39 +
 .../java/org/apache/tajo/storage/Storage.java   |   45 +
 .../org/apache/tajo/storage/StorageManager.java |   67 +
 .../tajo/storage/StorageManagerFactory.java     |   98 +
 .../org/apache/tajo/storage/StorageUtil.java    |   83 +
 .../apache/tajo/storage/TableStatistics.java    |  117 ++
 .../storage/TextSerializerDeserializer.java     |  209 ++
 .../java/org/apache/tajo/storage/Tuple.java     |   82 +
 .../apache/tajo/storage/TupleComparator.java    |  159 ++
 .../org/apache/tajo/storage/TupleRange.java     |  103 +
 .../java/org/apache/tajo/storage/VTuple.java    |  226 +++
 .../storage/annotation/ForSplitableStore.java   |   29 +
 .../apache/tajo/storage/compress/CodecPool.java |  185 ++
 .../AlreadyExistsStorageException.java          |   39 +
 .../exception/UnknownCodecException.java        |   32 +
 .../exception/UnknownDataTypeException.java     |   32 +
 .../exception/UnsupportedFileTypeException.java |   36 +
 .../tajo/storage/fragment/FileFragment.java     |  219 +++
 .../apache/tajo/storage/fragment/Fragment.java  |   31 +
 .../storage/fragment/FragmentConvertor.java     |  123 ++
 .../apache/tajo/storage/index/IndexMethod.java  |   32 +
 .../apache/tajo/storage/index/IndexReader.java  |   35 +
 .../apache/tajo/storage/index/IndexWriter.java  |   33 +
 .../tajo/storage/index/OrderIndexReader.java    |   45 +
 .../apache/tajo/storage/index/bst/BSTIndex.java |  623 ++++++
 .../storage/rcfile/BytesRefArrayWritable.java   |  261 +++
 .../tajo/storage/rcfile/BytesRefWritable.java   |  248 +++
 .../storage/rcfile/ColumnProjectionUtils.java   |  117 ++
 .../rcfile/LazyDecompressionCallback.java       |   32 +
 .../rcfile/NonSyncByteArrayInputStream.java     |  113 ++
 .../rcfile/NonSyncByteArrayOutputStream.java    |  144 ++
 .../storage/rcfile/NonSyncDataInputBuffer.java  |  507 +++++
 .../storage/rcfile/NonSyncDataOutputBuffer.java |   91 +
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 1739 +++++++++++++++++
 .../SchemaAwareCompressionInputStream.java      |   43 +
 .../SchemaAwareCompressionOutputStream.java     |   44 +
 .../tajo/storage/trevni/TrevniAppender.java     |  201 ++
 .../tajo/storage/trevni/TrevniScanner.java      |  193 ++
 .../apache/tajo/storage/v2/CSVFileScanner.java  |  386 ++++
 .../apache/tajo/storage/v2/DiskDeviceInfo.java  |   62 +
 .../tajo/storage/v2/DiskFileScanScheduler.java  |  205 ++
 .../org/apache/tajo/storage/v2/DiskInfo.java    |   75 +
 .../apache/tajo/storage/v2/DiskMountInfo.java   |  101 +
 .../org/apache/tajo/storage/v2/DiskUtil.java    |  199 ++
 .../apache/tajo/storage/v2/FileScanRunner.java  |   70 +
 .../apache/tajo/storage/v2/FileScannerV2.java   |  203 ++
 .../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ++++++++++++++++++
 .../apache/tajo/storage/v2/RCFileScanner.java   |  297 +++
 .../apache/tajo/storage/v2/ScanScheduler.java   |  189 ++
 .../tajo/storage/v2/ScheduledInputStream.java   |  513 +++++
 .../tajo/storage/v2/StorageManagerV2.java       |  140 ++
 tajo-storage/src/main/proto/IndexProtos.proto   |   29 +
 .../src/main/resources/storage-default.xml      |  149 ++
 .../tajo/storage/TestCompressionStorages.java   |  233 +++
 .../org/apache/tajo/storage/TestFrameTuple.java |   84 +
 .../org/apache/tajo/storage/TestLazyTuple.java  |  258 +++
 .../apache/tajo/storage/TestMergeScanner.java   |  179 ++
 .../apache/tajo/storage/TestStorageManager.java |   93 +
 .../org/apache/tajo/storage/TestStorages.java   |  375 ++++
 .../tajo/storage/TestTupleComparator.java       |   77 +
 .../org/apache/tajo/storage/TestVTuple.java     |  160 ++
 .../apache/tajo/storage/index/TestBSTIndex.java |  948 +++++++++
 .../index/TestSingleCSVFileBSTIndex.java        |  248 +++
 .../tajo/storage/v2/TestCSVCompression.java     |  213 ++
 .../apache/tajo/storage/v2/TestCSVScanner.java  |  168 ++
 .../apache/tajo/storage/v2/TestStorages.java    |  242 +++
 .../src/test/resources/storage-default.xml      |  149 ++
 183 files changed, 19287 insertions(+), 19198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3f2bff..9699782 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -386,6 +386,8 @@ Release 0.8.0 - unreleased
 
   TASKS
 
+    TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
+
     TAJO-536: Fix warnings in tajo-core-storage. (jinho)
 
     TAJO-545: MySQLStore Documentation. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d70f0e4..569e9be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
     <module>tajo-client</module>
     <module>tajo-jdbc</module>
     <module>tajo-dist</module>
+    <module>tajo-storage</module>
   </modules>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-client/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml
index 9b4cd5e..c6fbb27 100644
--- a/tajo-client/pom.xml
+++ b/tajo-client/pom.xml
@@ -201,7 +201,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-core-storage</artifactId>
+      <artifactId>tajo-storage</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 09978c3..e131d0e 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -34,7 +34,6 @@
 
   <modules>
     <module>tajo-core-backend</module>
-	  <module>tajo-core-storage</module>
 	  <module>tajo-core-pullserver</module>
   </modules>
 
@@ -161,7 +160,6 @@
                       run rm -rf ${project.artifactId}-${project.version}
                       run mkdir ${project.artifactId}-${project.version}
                       run cd ${project.artifactId}-${project.version}
-                      run cp -r ${basedir}/${project.artifactId}-storage/target/${project.artifactId}-storage-${project.version}*.jar .
                       run cp -r ${basedir}/${project.artifactId}-pullserver/target/${project.artifactId}-pullserver-${project.version}*.jar .
                       run cp -r ${basedir}/${project.artifactId}-backend/target/${project.artifactId}-backend-${project.version}*.jar .
                       run cp -r ${basedir}/${project.artifactId}-backend/target/lib .

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index abc217e..fce9925 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -213,7 +213,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-core-storage</artifactId>
+      <artifactId>tajo-storage</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/pom.xml b/tajo-core/tajo-core-pullserver/pom.xml
index 8c6d4fe..0bdfed2 100644
--- a/tajo-core/tajo-core-pullserver/pom.xml
+++ b/tajo-core/tajo-core-pullserver/pom.xml
@@ -44,7 +44,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-core-storage</artifactId>
+      <artifactId>tajo-storage</artifactId>
       <scope>provided</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/pom.xml b/tajo-core/tajo-core-storage/pom.xml
deleted file mode 100644
index dcbde44..0000000
--- a/tajo-core/tajo-core-storage/pom.xml
+++ /dev/null
@@ -1,301 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Copyright 2012 Database Lab., Korea Univ.
-
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>tajo-project</artifactId>
-    <groupId>org.apache.tajo</groupId>
-    <version>0.8.0-SNAPSHOT</version>
-    <relativePath>../../tajo-project</relativePath>
-  </parent>
-  <artifactId>tajo-core-storage</artifactId>
-  <packaging>jar</packaging>
-  <name>Tajo Core Storage</name>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-  </properties>
-
-  <repositories>
-    <repository>
-      <id>repository.jboss.org</id>
-      <url>https://repository.jboss.org/nexus/content/repositories/releases/
-            </url>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </repository>
-  </repositories>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
-          <encoding>${project.build.sourceEncoding}</encoding>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>create-protobuf-generated-sources-directory</id>
-            <phase>initialize</phase>
-            <configuration>
-              <target>
-                <mkdir dir="target/generated-sources/proto" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <version>1.2</version>
-        <executions>
-          <execution>
-            <id>generate-sources</id>
-            <phase>generate-sources</phase>
-            <configuration>
-              <executable>protoc</executable>
-              <arguments>
-                <argument>-Isrc/main/proto/</argument>
-                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
-                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
-                <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/IndexProtos.proto</argument>
-              </arguments>
-            </configuration>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>1.5</version>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>target/generated-sources/proto</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-pmd-plugin</artifactId>
-        <version>2.7.1</version>
-      </plugin>
-    </plugins>
-  </build>
-
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-catalog-common</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>trevni-core</artifactId>
-      <version>1.7.3</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>trevni-avro</artifactId>
-      <version>1.7.3</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <exclusions>
-      <exclusion>
-        <groupId>commons-el</groupId>
-        <artifactId>commons-el</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>tomcat</groupId>
-        <artifactId>jasper-runtime</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>tomcat</groupId>
-        <artifactId>jasper-compiler</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>org.mortbay.jetty</groupId>
-        <artifactId>jsp-2.1-jetty</artifactId>
-      </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey.jersey-test-framework</groupId>
-          <artifactId>jersey-test-framework-grizzly2</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-      <type>jar</type>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.6</version>
-    </dependency>
-  </dependencies>
-
-  <profiles>
-    <profile>
-      <id>docs</id>
-      <activation>
-        <activeByDefault>false</activeByDefault>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-javadoc-plugin</artifactId>
-            <executions>
-              <execution>
-                <!-- build javadoc jars per jar for publishing to maven -->
-                <id>module-javadocs</id>
-                <phase>package</phase>
-                <goals>
-                  <goal>jar</goal>
-                </goals>
-                <configuration>
-                  <destDir>${project.build.directory}</destDir>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
-      <id>src</id>
-      <activation>
-        <activeByDefault>false</activeByDefault>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-source-plugin</artifactId>
-            <executions>
-              <execution>
-                <!-- builds source jars and attaches them to the project for publishing -->
-                <id>tajo-java-sources</id>
-                <phase>package</phase>
-                <goals>
-                  <goal>jar-no-fork</goal>
-                </goals>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
-  <reporting>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-project-info-reports-plugin</artifactId>
-        <version>2.4</version>
-        <configuration>
-          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-        </configuration>
-      </plugin>
-    </plugins>
-  </reporting>
-
-</project>
-

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
deleted file mode 100644
index 91a535e..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public abstract class AbstractStorageManager {
-  private final Log LOG = LogFactory.getLog(AbstractStorageManager.class);
-
-  protected final TajoConf conf;
-  protected final FileSystem fs;
-  protected final Path tableBaseDir;
-  protected final boolean blocksMetadataEnabled;
-
-  /**
-   * Cache of scanner handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
-  /**
-   * Cache of appender handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
-  /**
-   * Cache of constructors for each class. Pins the classes so they
-   * can't be garbage collected until ReflectionUtils can be collected.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-  public abstract Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException;
-
-  public abstract Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
-
-  protected AbstractStorageManager(TajoConf conf) throws IOException {
-    this.conf = conf;
-    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
-    this.fs = tableBaseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled)
-      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
-  }
-
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
-      throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
-    return getScanner(meta, schema, fragment);
-  }
-
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
-  }
-
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target);
-  }
-
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
-    return getScanner(meta, schema, fragment, schema);
-  }
-
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
-  public Path getWarehouseDir() {
-    return this.tableBaseDir;
-  }
-
-  public void delete(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    fs.delete(tablePath, true);
-  }
-
-  public boolean exists(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    return fileSystem.exists(path);
-  }
-
-  /**
-   * This method deletes only data contained in the given path.
-   *
-   * @param path The path in which data are deleted.
-   * @throws IOException
-   */
-  public void deleteData(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] fileLists = fileSystem.listStatus(path);
-    for (FileStatus status : fileLists) {
-      fileSystem.delete(status.getPath(), true);
-    }
-  }
-
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
-  }
-
-  public Appender getAppender(TableMeta meta, Schema schema, Path path)
-      throws IOException {
-    Appender appender;
-
-    Class<? extends FileAppender> appenderClass;
-
-    String handlerName = meta.getStoreType().name().toLowerCase();
-    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
-    if (appenderClass == null) {
-      appenderClass = conf.getClass(
-          String.format("tajo.storage.appender-handler.%s.class",
-              meta.getStoreType().name().toLowerCase()), null,
-          FileAppender.class);
-      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
-    }
-
-    if (appenderClass == null) {
-      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
-    }
-
-    appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
-
-    return appender;
-  }
-
-
-  public TableMeta getTableMeta(Path tablePath) throws IOException {
-    TableMeta meta;
-
-    FileSystem fs = tablePath.getFileSystem(conf);
-    Path tableMetaPath = new Path(tablePath, ".meta");
-    if (!fs.exists(tableMetaPath)) {
-      throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
-    }
-
-    FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
-    CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
-        CatalogProtos.TableProto.getDefaultInstance());
-    meta = new TableMeta(tableProto);
-
-    return meta;
-  }
-
-  public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fragmentSize);
-  }
-
-  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
-      listTablets.add(tablet);
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public FileFragment[] split(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  private FileFragment[] split(String tableName, Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
-                                   Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public long calculateSize(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    long totalSize = 0;
-
-    if (fs.exists(tablePath)) {
-      totalSize = fs.getContentSummary(tablePath).getLength();
-    }
-
-    return totalSize;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // FileInputFormat Area
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * hiddenFileFilter together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression.
-   *
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listStatus(Path path) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    Path[] dirs = new Path[]{path};
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the hiddenFileFilter and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(hiddenFileFilter);
-
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (int i = 0; i < dirs.length; ++i) {
-      Path p = dirs[i];
-
-      FileSystem fs = p.getFileSystem(conf);
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat : matches) {
-          if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              result.add(stat);
-            }
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * Get the lower bound on split size imposed by the format.
-   *
-   * @return the number of bytes of the minimal split for this format
-   */
-  protected long getFormatMinSplitSize() {
-    return 1;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * <p/>
-   * <code>FileInputFormat</code> implementations can override this and return
-   * <code>false</code> to ensure that individual input files are never split-up
-   * so that Mappers process entire files.
-   *
-   *
-   * @param filename the file name to check
-   * @return is this file isSplittable?
-   */
-  protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException {
-    Scanner scanner = getFileScanner(meta, schema, filename);
-    return scanner.isSplittable();
-  }
-
-  @Deprecated
-  protected long computeSplitSize(long blockSize, long minSize,
-                                  long maxSize) {
-    return Math.max(minSize, Math.min(maxSize, blockSize));
-  }
-
-  @Deprecated
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  @Deprecated
-  protected int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) &&
-          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * A factory that makes the split for this class. It can be overridden
-   * by sub-classes to make sub-types
-   */
-  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
-    return new FileFragment(fragmentId, file, start, length);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
-                               int[] diskIds) throws IOException {
-    return new FileFragment(fragmentId, file, blockLocation, diskIds);
-  }
-
-  // for Non Splittable. eg, compressed gzip TextFile
-  protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
-                                  BlockLocation[] blkLocations) throws IOException {
-
-    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
-    for (BlockLocation blockLocation : blkLocations) {
-      for (String host : blockLocation.getHosts()) {
-        if (hostsBlockMap.containsKey(host)) {
-          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
-        } else {
-          hostsBlockMap.put(host, 1);
-        }
-      }
-    }
-
-    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
-    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
-      @Override
-      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
-        return v1.getValue().compareTo(v2.getValue());
-      }
-    });
-
-    String[] hosts = new String[blkLocations[0].getHosts().length];
-
-    for (int i = 0; i < hosts.length; i++) {
-      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
-      hosts[i] = entry.getKey();
-    }
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  /**
-   * Get the maximum split size.
-   *
-   * @return the maximum number of bytes a split can include
-   */
-  @Deprecated
-  public static long getMaxSplitSize() {
-    // TODO - to be configurable
-    return 536870912L;
-  }
-
-  /**
-   * Get the minimum split size
-   *
-   * @return the minimum number of bytes that can be in a split
-   */
-  @Deprecated
-  public static long getMinSplitSize() {
-    // TODO - to be configurable
-    return 67108864L;
-  }
-
-  /**
-   * Get Disk Ids by Volume Bytes
-   */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
-    for (int i = 0; i < volumeIds.length; i++) {
-      int diskId = -1;
-      if (volumeIds[i] != null && volumeIds[i].isValid()) {
-        String volumeIdString = volumeIds[i].toString();
-        byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
-
-        if (volumeIdBytes.length == 4) {
-          diskId = Bytes.toInt(volumeIdBytes);
-        } else if (volumeIdBytes.length == 1) {
-          diskId = (int) volumeIdBytes[0];  // support hadoop-2.0.2
-        }
-      }
-      diskIds[i] = diskId;
-    }
-    return diskIds;
-  }
-
-  /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
-   * Generate the list of files and make them into FileSplits.
-   *
-   * @throws IOException
-   */
-  public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException {
-    // generate splits'
-
-    List<FileFragment> splits = new ArrayList<FileFragment>();
-    FileSystem fs = inputPath.getFileSystem(conf);
-    List<FileStatus> files;
-    if (fs.isFile(inputPath)) {
-      files = Lists.newArrayList(fs.getFileStatus(inputPath));
-    } else {
-      files = listStatus(inputPath);
-    }
-    for (FileStatus file : files) {
-      Path path = file.getPath();
-      long length = file.getLen();
-      if (length > 0) {
-        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-        boolean splittable = isSplittable(meta, schema, path);
-        if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-          // supported disk volume
-          BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
-              .getFileBlockStorageLocations(Arrays.asList(blkLocations));
-          if (splittable) {
-            for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
-                  .getVolumeIds())));
-            }
-          } else { // Non splittable
-            long blockSize = blockStorageLocations[0].getLength();
-            if (blockSize >= length) {
-              for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-                splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
-                    .getVolumeIds())));
-              }
-            } else {
-              splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
-            }
-          }
-
-        } else {
-          if (splittable) {
-            for (BlockLocation blockLocation : blkLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockLocation, null));
-            }
-          } else { // Non splittable
-            splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
-          }
-        }
-      } else {
-        //for zero length files
-        splits.add(makeSplit(tableName, meta, path, 0, length));
-      }
-    }
-
-    LOG.info("Total # of splits: " + splits.size());
-    return splits;
-  }
-
-  private static class InvalidInputException extends IOException {
-    List<IOException> errors;
-    public InvalidInputException(List<IOException> errors) {
-      this.errors = errors;
-    }
-
-    @Override
-    public String getMessage(){
-       StringBuffer sb = new StringBuffer();
-      int messageLimit = Math.min(errors.size(), 10);
-      for (int i = 0; i < messageLimit ; i ++) {
-        sb.append(errors.get(i).getMessage()).append("\n");
-      }
-
-      if(messageLimit < errors.size())
-        sb.append("skipped .....").append("\n");
-
-      return sb.toString();
-    }
-  }
-
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      FileFragment.class
-  };
-
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Path.class
-  };
-
-  /**
-   * create a scanner instance.
-   */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
-                                         Fragment fragment) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
-   * create a scanner instance.
-   */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema,
-                                          Path path) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, schema, meta, path});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
deleted file mode 100644
index ed6ea34..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface Appender extends Closeable {
-
-  void init() throws IOException;
-
-  void addTuple(Tuple t) throws IOException;
-  
-  void flush() throws IOException;
-  
-  void close() throws IOException;
-
-  void enableStats();
-  
-  TableStats getStats();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
deleted file mode 100644
index ed034be..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class BinarySerializerDeserializer implements SerializerDeserializer {
-
-  static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
-  @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
-      throws IOException {
-    byte[] bytes;
-    int length = 0;
-    if (datum == null || datum instanceof NullDatum) {
-      return 0;
-    }
-
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-      case BIT:
-      case CHAR:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case INT2:
-        length = writeShort(out, datum.asInt2());
-        break;
-      case INT4:
-        length = writeVLong(out, datum.asInt4());
-        break;
-      case INT8:
-        length = writeVLong(out, datum.asInt8());
-        break;
-      case FLOAT4:
-        length = writeFloat(out, datum.asFloat4());
-        break;
-      case FLOAT8:
-        length = writeDouble(out, datum.asFloat8());
-        break;
-      case TEXT: {
-        bytes = datum.asTextBytes();
-        length = datum.size();
-        if (length == 0) {
-          bytes = INVALID_UTF__SINGLE_BYTE;
-          length = INVALID_UTF__SINGLE_BYTE.length;
-        }
-        out.write(bytes, 0, bytes.length);
-        break;
-      }
-      case BLOB:
-      case INET4:
-      case INET6:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
-        bytes = protobufDatum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case NULL_TYPE:
-        break;
-      default:
-        throw new IOException("Does not support type");
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-    if (length == 0) return NullDatum.get();
-
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        datum = DatumFactory.createBool(bytes[offset]);
-        break;
-      case BIT:
-        datum = DatumFactory.createBit(bytes[offset]);
-        break;
-      case CHAR: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-        datum = DatumFactory.createChar(chars);
-        break;
-      }
-      case INT2:
-        datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
-        break;
-      case INT4:
-        datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
-        break;
-      case INT8:
-        datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
-        break;
-      case FLOAT4:
-        datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
-        break;
-      case FLOAT8:
-        datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
-        break;
-      case TEXT: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-
-        if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
-          datum = DatumFactory.createText(new byte[0]);
-        } else {
-          datum = DatumFactory.createText(chars);
-        }
-        break;
-      }
-      case PROTOBUF: {
-        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
-        Message.Builder builder = factory.newBuilder();
-        builder.mergeFrom(bytes, offset, length);
-        datum = factory.createDatum(builder);
-        break;
-      }
-      case INET4:
-        datum = DatumFactory.createInet4(bytes, offset, length);
-        break;
-      case BLOB:
-        datum = DatumFactory.createBlob(bytes, offset, length);
-        break;
-      default:
-        datum = NullDatum.get();
-    }
-    return datum;
-  }
-
-  private byte[] shortBytes = new byte[2];
-
-  public int writeShort(OutputStream out, short val) throws IOException {
-    shortBytes[0] = (byte) (val >> 8);
-    shortBytes[1] = (byte) val;
-    out.write(shortBytes, 0, 2);
-    return 2;
-  }
-
-  public float toFloat(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 4);
-
-    int val = ((bytes[offset] & 0x000000FF) << 24) +
-        ((bytes[offset + 1] & 0x000000FF) << 16) +
-        ((bytes[offset + 2] & 0x000000FF) << 8) +
-        (bytes[offset + 3] & 0x000000FF);
-    return Float.intBitsToFloat(val);
-  }
-
-  private byte[] floatBytes = new byte[4];
-
-  public int writeFloat(OutputStream out, float f) throws IOException {
-    int val = Float.floatToIntBits(f);
-
-    floatBytes[0] = (byte) (val >> 24);
-    floatBytes[1] = (byte) (val >> 16);
-    floatBytes[2] = (byte) (val >> 8);
-    floatBytes[3] = (byte) val;
-    out.write(floatBytes, 0, 4);
-    return floatBytes.length;
-  }
-
-  public double toDouble(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 8);
-    long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
-        ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
-        ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
-        ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
-        ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
-        ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
-        ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
-        (long) (bytes[offset + 7] & 0x00000000000000FF);
-    return Double.longBitsToDouble(val);
-  }
-
-  private byte[] doubleBytes = new byte[8];
-
-  public int writeDouble(OutputStream out, double d) throws IOException {
-    long val = Double.doubleToLongBits(d);
-
-    doubleBytes[0] = (byte) (val >> 56);
-    doubleBytes[1] = (byte) (val >> 48);
-    doubleBytes[2] = (byte) (val >> 40);
-    doubleBytes[3] = (byte) (val >> 32);
-    doubleBytes[4] = (byte) (val >> 24);
-    doubleBytes[5] = (byte) (val >> 16);
-    doubleBytes[6] = (byte) (val >> 8);
-    doubleBytes[7] = (byte) val;
-    out.write(doubleBytes, 0, 8);
-    return doubleBytes.length;
-  }
-
-  private byte[] vLongBytes = new byte[9];
-
-  public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
-    if (l >= -112 && l <= 127) {
-      bytes[offset] = (byte) l;
-      return 1;
-    }
-
-    int len = -112;
-    if (l < 0) {
-      l ^= -1L; // take one's complement'
-      len = -120;
-    }
-
-    long tmp = l;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
-    }
-
-    bytes[offset++] = (byte) len;
-    len = (len < -120) ? -(len + 120) : -(len + 112);
-
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
-    }
-    return 1 + len;
-  }
-
-  public int writeVLong(OutputStream out, long l) throws IOException {
-    int len = writeVLongToByteArray(vLongBytes, 0, l);
-    out.write(vLongBytes, 0, len);
-    return len;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
deleted file mode 100644
index 5d05d6f..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.Bytes;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class CSVFile {
-
-  public static final String DELIMITER = "csvfile.delimiter";
-  public static final String NULL = "csvfile.null";     //read only
-  public static final String SERDE = "csvfile.serde";
-  public static final String DELIMITER_DEFAULT = "|";
-  public static final byte LF = '\n';
-  public static int EOF = -1;
-
-  private static final Log LOG = LogFactory.getLog(CSVFile.class);
-
-  public static class CSVAppender extends FileAppender {
-    private final TableMeta meta;
-    private final Schema schema;
-    private final int columnNum;
-    private final FileSystem fs;
-    private FSDataOutputStream fos;
-    private DataOutputStream outputStream;
-    private CompressionOutputStream deflateFilter;
-    private char delimiter;
-    private TableStatistics stats = null;
-    private Compressor compressor;
-    private CompressionCodecFactory codecFactory;
-    private CompressionCodec codec;
-    private Path compressedPath;
-    private byte[] nullChars;
-    private int BUFFER_SIZE = 128 * 1024;
-    private int bufferedBytes = 0;
-    private long pos = 0;
-
-    private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
-    private SerializerDeserializer serde;
-
-    public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
-      super(conf, schema, meta, path);
-      this.fs = path.getFileSystem(conf);
-      this.meta = meta;
-      this.schema = schema;
-      this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
-      this.columnNum = schema.getColumnNum();
-      String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-    }
-
-    @Override
-    public void init() throws IOException {
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
-      if(!StringUtils.isEmpty(codecName)){
-        codecFactory = new CompressionCodecFactory(conf);
-        codec = codecFactory.getCodecByClassName(codecName);
-        compressor =  CodecPool.getCompressor(codec);
-        if(compressor != null) compressor.reset();  //builtin gzip is null
-
-        String extension = codec.getDefaultExtension();
-        compressedPath = path.suffix(extension);
-
-        if (fs.exists(compressedPath)) {
-          throw new AlreadyExistsStorageException(compressedPath);
-        }
-
-        fos = fs.create(compressedPath);
-        deflateFilter = codec.createOutputStream(fos, compressor);
-        outputStream = new DataOutputStream(deflateFilter);
-
-      } else {
-        if (fs.exists(path)) {
-          throw new AlreadyExistsStorageException(path);
-        }
-        fos = fs.create(path);
-        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
-      }
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-
-      try {
-        String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
-      os.reset();
-      pos = fos.getPos();
-      bufferedBytes = 0;
-      super.init();
-    }
-
-
-    @Override
-    public void addTuple(Tuple tuple) throws IOException {
-      Datum datum;
-      int rowBytes = 0;
-
-      for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
-        if(columnNum - 1 > i){
-          os.write((byte) delimiter);
-          rowBytes += 1;
-        }
-        if (enabledStats) {
-          stats.analyzeField(i, datum);
-        }
-      }
-      os.write(LF);
-      rowBytes += 1;
-
-      pos += rowBytes;
-      bufferedBytes += rowBytes;
-      if(bufferedBytes > BUFFER_SIZE){
-        flushBuffer();
-      }
-      // Statistical section
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if(os.getLength() > 0) {
-        os.writeTo(outputStream);
-        os.reset();
-        bufferedBytes = 0;
-      }
-    }
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    @Override
-    public void flush() throws IOException {
-      flushBuffer();
-      outputStream.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      try {
-        flush();
-
-        // Statistical section
-        if (enabledStats) {
-          stats.setNumBytes(getOffset());
-        }
-
-        if(deflateFilter != null) {
-          deflateFilter.finish();
-          deflateFilter.resetState();
-          deflateFilter = null;
-        }
-
-        os.close();
-      } finally {
-        IOUtils.cleanup(LOG, fos);
-        if (compressor != null) {
-          CodecPool.returnCompressor(compressor);
-          compressor = null;
-        }
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-
-    public boolean isCompress() {
-      return compressor != null;
-    }
-
-    public String getExtension() {
-      return codec != null ? codec.getDefaultExtension() : "";
-    }
-  }
-
-  public static class CSVScanner extends FileScanner implements SeekableScanner {
-    public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-      factory = new CompressionCodecFactory(conf);
-      codec = factory.getCodec(fragment.getPath());
-      if (codec == null || codec instanceof SplittableCompressionCodec) {
-        splittable = true;
-      }
-
-      //Delimiter
-      String delim  = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
-      this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-    }
-
-    private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
-    private char delimiter;
-    private FileSystem fs;
-    private FSDataInputStream fis;
-    private InputStream is; //decompressd stream
-    private CompressionCodecFactory factory;
-    private CompressionCodec codec;
-    private Decompressor decompressor;
-    private Seekable filePosition;
-    private boolean splittable = false;
-    private long startOffset, end, pos;
-    private int currentIdx = 0, validIdx = 0, recordCount = 0;
-    private int[] targetColumnIndexes;
-    private boolean eof = false;
-    private final byte[] nullChars;
-    private SplitLineReader reader;
-    private ArrayList<Long> fileOffsets = new ArrayList<Long>();
-    private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
-    private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
-    private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
-    private SerializerDeserializer serde;
-
-    @Override
-    public void init() throws IOException {
-
-      // FileFragment information
-      if(fs == null) {
-        fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
-      }
-      if(fis == null) fis = fs.open(fragment.getPath());
-
-      recordCount = 0;
-      pos = startOffset = fragment.getStartKey();
-      end = startOffset + fragment.getEndKey();
-
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-        if (codec instanceof SplittableCompressionCodec) {
-          SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-              fis, decompressor, startOffset, end,
-              SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
-          reader = new CompressedSplitLineReader(cIn, conf, null);
-          startOffset = cIn.getAdjustedStart();
-          end = cIn.getAdjustedEnd();
-          filePosition = cIn;
-          is = cIn;
-        } else {
-          is = new DataInputStream(codec.createInputStream(fis, decompressor));
-          reader = new SplitLineReader(is, null);
-          filePosition = fis;
-        }
-      } else {
-        fis.seek(startOffset);
-        filePosition = fis;
-        is = fis;
-        reader = new SplitLineReader(is, null);
-      }
-
-      if (targets == null) {
-        targets = schema.toArray();
-      }
-
-      targetColumnIndexes = new int[targets.length];
-      for (int i = 0; i < targets.length; i++) {
-        targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
-      }
-
-      try {
-        String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
-      super.init();
-      Arrays.sort(targetColumnIndexes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
-            "," + fs.getFileStatus(fragment.getPath()).getLen());
-      }
-
-      if (startOffset != 0) {
-        startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
-        pos = startOffset;
-      }
-      eof = false;
-      page();
-    }
-
-    private int maxBytesToConsume(long pos) {
-      return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
-    }
-
-    private long fragmentable() throws IOException {
-      return end - getFilePosition();
-    }
-
-    private long getFilePosition() throws IOException {
-      long retVal;
-      if (isCompress()) {
-        retVal = filePosition.getPos();
-      } else {
-        retVal = pos;
-      }
-      return retVal;
-    }
-
-    private void page() throws IOException {
-//      // Index initialization
-      currentIdx = 0;
-      validIdx = 0;
-      int currentBufferPos = 0;
-      int bufferedSize = 0;
-
-      buffer.reset();
-      startOffsets.clear();
-      rowLengthList.clear();
-      fileOffsets.clear();
-
-      if(eof) return;
-
-      while (DEFAULT_PAGE_SIZE >= bufferedSize){
-
-        int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
-
-        if(ret == 0){
-          break;
-        } else {
-          fileOffsets.add(pos);
-          pos += ret;
-          startOffsets.add(currentBufferPos);
-          currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
-          bufferedSize += ret;
-          validIdx++;
-          recordCount++;
-        }
-
-        if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
-          eof = true;
-          break;
-        }
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      try {
-        if (currentIdx == validIdx) {
-          if (eof) {
-            return null;
-          } else {
-            page();
-
-            if(currentIdx == validIdx){
-              return null;
-            }
-          }
-        }
-
-        long offset = -1;
-        if(!isCompress()){
-          offset = fileOffsets.get(currentIdx);
-        }
-
-        byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
-            rowLengthList.get(currentIdx),  delimiter, targetColumnIndexes);
-        currentIdx++;
-        return new LazyTuple(schema, cells, offset, nullChars, serde);
-      } catch (Throwable t) {
-        LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
-        LOG.error("Tuple list current index: " + currentIdx, t);
-        throw new IOException(t);
-      }
-    }
-
-    private boolean isCompress() {
-      return codec != null;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-        decompressor = null;
-      }
-
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        IOUtils.cleanup(LOG, reader, is, fis);
-        fs = null;
-        is = null;
-        fis = null;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("CSVScanner processed record:" + recordCount);
-        }
-      } finally {
-        if (decompressor != null) {
-          CodecPool.returnDecompressor(decompressor);
-          decompressor = null;
-        }
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return true;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public void seek(long offset) throws IOException {
-      if(isCompress()) throw new UnsupportedException();
-
-      int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
-
-      if (tupleIndex > -1) {
-        this.currentIdx = tupleIndex;
-      } else if (isSplittable() && end >= offset || startOffset <= offset) {
-        eof = false;
-        fis.seek(offset);
-        pos = offset;
-        reader.reset();
-        this.currentIdx = 0;
-        this.validIdx = 0;
-        // pageBuffer();
-      } else {
-        throw new IOException("invalid offset " +
-            " < start : " +  startOffset + " , " +
-            "  end : " + end + " , " +
-            "  filePos : " + filePosition.getPos() + " , " +
-            "  input offset : " + offset + " >");
-      }
-    }
-
-    @Override
-    public long getNextOffset() throws IOException {
-      if(isCompress()) throw new UnsupportedException();
-
-      if (this.currentIdx == this.validIdx) {
-        if (fragmentable() <= 0) {
-          return -1;
-        } else {
-          page();
-          if(currentIdx == validIdx) return -1;
-        }
-      }
-      return fileOffsets.get(currentIdx);
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return splittable;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
deleted file mode 100644
index 4f58e68..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * Line reader for compressed splits
- *
- * Reading records from a compressed split is tricky, as the
- * LineRecordReader is using the reported compressed input stream
- * position directly to determine when a split has ended.  In addition the
- * compressed input stream is usually faking the actual byte position, often
- * updating it only after the first compressed block after the split is
- * accessed.
- *
- * Depending upon where the last compressed block of the split ends relative
- * to the record delimiters it can be easy to accidentally drop the last
- * record or duplicate the last record between this split and the next.
- *
- * Split end scenarios:
- *
- * 1) Last block of split ends in the middle of a record
- *      Nothing special that needs to be done here, since the compressed input
- *      stream will report a position after the split end once the record
- *      is fully read.  The consumer of the next split will discard the
- *      partial record at the start of the split normally, and no data is lost
- *      or duplicated between the splits.
- *
- * 2) Last block of split ends in the middle of a delimiter
- *      The line reader will continue to consume bytes into the next block to
- *      locate the end of the delimiter.  If a custom delimiter is being used
- *      then the next record must be read by this split or it will be dropped.
- *      The consumer of the next split will not recognize the partial
- *      delimiter at the beginning of its split and will discard it along with
- *      the next record.
- *
- *      However for the default delimiter processing there is a special case
- *      because CR, LF, and CRLF are all valid record delimiters.  If the
- *      block ends with a CR then the reader must peek at the next byte to see
- *      if it is an LF and therefore part of the same record delimiter.
- *      Peeking at the next byte is an access to the next block and triggers
- *      the stream to report the end of the split.  There are two cases based
- *      on the next byte:
- *
- *      A) The next byte is LF
- *           The split needs to end after the current record is returned.  The
- *           consumer of the next split will discard the first record, which
- *           is degenerate since LF is itself a delimiter, and start consuming
- *           records after that byte.  If the current split tries to read
- *           another record then the record will be duplicated between splits.
- *
- *      B) The next byte is not LF
- *           The current record will be returned but the stream will report
- *           the split has ended due to the peek into the next block.  If the
- *           next record is not read then it will be lost, as the consumer of
- *           the next split will discard it before processing subsequent
- *           records.  Therefore the next record beyond the reported split end
- *           must be consumed by this split to avoid data loss.
- *
- * 3) Last block of split ends at the beginning of a delimiter
- *      This is equivalent to case 1, as the reader will consume bytes into
- *      the next block and trigger the end of the split.  No further records
- *      should be read as the consumer of the next split will discard the
- *      (degenerate) record at the beginning of its split.
- *
- * 4) Last block of split ends at the end of a delimiter
- *      Nothing special needs to be done here. The reader will not start
- *      examining the bytes into the next block until the next record is read,
- *      so the stream will not report the end of the split just yet.  Once the
- *      next record is read then the next block will be accessed and the
- *      stream will indicate the end of the split.  The consumer of the next
- *      split will correctly discard the first record of its split, and no
- *      data is lost or duplicated.
- *
- *      If the default delimiter is used and the block ends at a CR then this
- *      is treated as case 2 since the reader does not yet know without
- *      looking at subsequent bytes whether the delimiter has ended.
- *
- * NOTE: It is assumed that compressed input streams *never* return bytes from
- *       multiple compressed blocks from a single read.  Failure to do so will
- *       violate the buffering performed by this class, as it will access
- *       bytes into the next block after the split before returning all of the
- *       records from the previous block.
- */
-
-public class CompressedSplitLineReader extends SplitLineReader {
-  SplitCompressionInputStream scin;
-  private boolean usingCRLF;
-  private boolean needAdditionalRecord = false;
-  private boolean finished = false;
-
-  public CompressedSplitLineReader(SplitCompressionInputStream in,
-                                   Configuration conf,
-                                   byte[] recordDelimiterBytes)
-      throws IOException {
-    super(in, conf, recordDelimiterBytes);
-    scin = in;
-    usingCRLF = (recordDelimiterBytes == null);
-  }
-
-  @Override
-  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
-      throws IOException {
-    int bytesRead = in.read(buffer);
-
-    // If the split ended in the middle of a record delimiter then we need
-    // to read one additional record, as the consumer of the next split will
-    // not recognize the partial delimiter as a record.
-    // However if using the default delimiter and the next character is a
-    // linefeed then next split will treat it as a delimiter all by itself
-    // and the additional record read should not be performed.
-    if (inDelimiter && bytesRead > 0) {
-      if (usingCRLF) {
-        needAdditionalRecord = (buffer[0] != '\n');
-      } else {
-        needAdditionalRecord = true;
-      }
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    int bytesRead = 0;
-    if (!finished) {
-      // only allow at most one more record to be read after the stream
-      // reports the split ended
-      if (scin.getPos() > scin.getAdjustedEnd()) {
-        finished = true;
-      }
-
-      bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
-      , int maxBytesToConsume) throws IOException {
-    int bytesRead = 0;
-    if (!finished) {
-      // only allow at most one more record to be read after the stream
-      // reports the split ended
-      if (scin.getPos() > scin.getAdjustedEnd()) {
-        finished = true;
-      }
-
-      bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public boolean needAdditionalRecordAfterSplit() {
-    return !finished && needAdditionalRecord;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
deleted file mode 100644
index 8841a31..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DataLocation {
-  private String host;
-  private int volumeId;
-
-  public DataLocation(String host, int volumeId) {
-    this.host = host;
-    this.volumeId = volumeId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getVolumeId() {
-    return volumeId;
-  }
-
-  @Override
-  public String toString() {
-    return "DataLocation{" +
-        "host=" + host +
-        ", volumeId=" + volumeId +
-        '}';
-  }
-}