You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:44 UTC

[29/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index f807987..8a61cab 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -70,7 +70,7 @@ public class TestSortExec {
     util = TpchTestBase.getInstance().getTestingCluster();
     catalog = util.getMaster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = StorageManager.getFileStorageManager(conf, workDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,8 @@ public class TestSortExec {
     tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
     sm.getFileSystem().mkdirs(tablePath.getParent());
 
-    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(employeeMeta, schema, tablePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index b74f634..68b3fb3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -565,8 +565,8 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
         }
         Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
         fileIndex++;
-        appender = StorageManager.getFileStorageManager(conf).getAppender(tableMeta, schema,
-            dataPath);
+        appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+            .getAppender(tableMeta, schema, dataPath);
         appender.init();
       }
       String[] columnDatas = rows[i].split("\\|");

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index ac5ff13..b8f3ef7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -63,7 +63,7 @@ public class TestResultSet {
   public static void setup() throws Exception {
     util = TpchTestBase.getInstance().getTestingCluster();
     conf = util.getConfiguration();
-    sm = StorageManager.getFileStorageManager(conf);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 
     scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);
@@ -73,8 +73,7 @@ public class TestResultSet {
 
     Path p = sm.getTablePath("score");
     sm.getFileSystem().mkdirs(p);
-    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(scoreMeta, scoreSchema,
-        new Path(p, "score"));
+    Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score"));
     appender.init();
     int deptSize = 100;
     int tupleNum = 10000;

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2aa56db..f36ff24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -36,7 +36,6 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.FileUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -70,7 +69,8 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
 
-    FileStorageManager sm = StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
+    FileStorageManager sm =
+        (FileStorageManager)StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
 
     Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");
@@ -80,7 +80,7 @@ public class TestRowFile {
 
     FileUtil.writeProto(fs, metaPath, meta.getProto());
 
-    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, dataPath);
+    Appender appender = sm.getAppender(meta, schema, dataPath);
     appender.enableStats();
     appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 517f425..5a93538 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -185,7 +185,7 @@ public class TestRangeRetrieverHandler {
     reader.open();
 
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema,
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
 
     scanner.init();
@@ -308,7 +308,7 @@ public class TestRangeRetrieverHandler {
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, outputMeta, schema,
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
     scanner.init();
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d350889..eb8ada9 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -75,7 +75,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-storage</artifactId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
       <scope>provided</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 20cdf16..1c3c410 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -102,7 +102,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-storage</artifactId>
+      <artifactId>tajo-storage-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index a82aa46..82ccbdc 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -744,6 +744,22 @@
         <groupId>org.apache.tajo</groupId>
         <artifactId>tajo-storage</artifactId>
         <version>${tajo.version}</version>
+        <type>pom</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-storage-common</artifactId>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-storage-hdfs</artifactId>
+        <version>${tajo.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tajo</groupId>
+        <artifactId>tajo-storage-hbase</artifactId>
+        <version>${tajo.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index dee429f..8acb1a9 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -1,5 +1,5 @@
-<!--
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
   Copyright 2012 Database Lab., Korea Univ.
 
   Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,328 +16,47 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>tajo-project</artifactId>
     <groupId>org.apache.tajo</groupId>
     <version>0.9.1-SNAPSHOT</version>
     <relativePath>../tajo-project</relativePath>
   </parent>
-
+  <modelVersion>4.0.0</modelVersion>
   <artifactId>tajo-storage</artifactId>
-  <packaging>jar</packaging>
+  <packaging>pom</packaging>
   <name>Tajo Storage</name>
-  <description>Tajo Storage Package</description>
-
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <parquet.version>1.5.0</parquet.version>
-    <parquet.format.version>2.1.0</parquet.format.version>
   </properties>
 
-  <repositories>
-    <repository>
-      <id>repository.jboss.org</id>
-      <url>https://repository.jboss.org/nexus/content/repositories/releases/
-            </url>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </repository>
-  </repositories>
+  <modules>
+    <module>tajo-storage-common</module>
+    <module>tajo-storage-hdfs</module>
+    <module>tajo-storage-hbase</module>
+  </modules>
 
   <build>
     <plugins>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
-          <encoding>${project.build.sourceEncoding}</encoding>
-        </configuration>
-      </plugin>
-      <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>verify</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <excludes>
-            <exclude>src/test/resources/testVariousTypes.avsc</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemProperties>
-            <tajo.test>TRUE</tajo.test>
-          </systemProperties>
-          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
-        </configuration>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
+        <artifactId>maven-surefire-report-plugin</artifactId>
       </plugin>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>create-protobuf-generated-sources-directory</id>
-            <phase>initialize</phase>
-            <configuration>
-              <target>
-                <mkdir dir="target/generated-sources/proto" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <version>1.2</version>
-        <executions>
-          <execution>
-            <id>generate-sources</id>
-            <phase>generate-sources</phase>
-            <configuration>
-              <executable>protoc</executable>
-              <arguments>
-                <argument>-Isrc/main/proto/</argument>
-                <argument>--proto_path=../tajo-common/src/main/proto</argument>
-                <argument>--proto_path=../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
-                <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/IndexProtos.proto</argument>
-                <argument>src/main/proto/StorageFragmentProtos.proto</argument>
-              </arguments>
-            </configuration>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>1.5</version>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>target/generated-sources/proto</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-pmd-plugin</artifactId>
-        <version>2.7.1</version>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
       </plugin>
     </plugins>
   </build>
 
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-catalog-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-plan</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <version>1.7.7</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>zookeeper</artifactId>
-          <groupId>org.apache.zookeeper</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>slf4j-api</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jersey-json</artifactId>
-          <groupId>com.sun.jersey</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <scope>provided</scope>
-      <exclusions>
-      <exclusion>
-        <groupId>commons-el</groupId>
-        <artifactId>commons-el</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>tomcat</groupId>
-        <artifactId>jasper-runtime</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>tomcat</groupId>
-        <artifactId>jasper-compiler</artifactId>
-      </exclusion>
-      <exclusion>
-        <groupId>org.mortbay.jetty</groupId>
-        <artifactId>jsp-2.1-jetty</artifactId>
-      </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey.jersey-test-framework</groupId>
-          <artifactId>jersey-test-framework-grizzly2</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey.jersey-test-framework</groupId>
-          <artifactId>jersey-test-framework-grizzly2</artifactId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-yarn-server-tests</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-mapreduce-client-app</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-yarn-api</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-mapreduce-client-hs</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-          <groupId>org.apache.hadoop</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-column</artifactId>
-      <version>${parquet.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-hadoop</artifactId>
-      <version>${parquet.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-format</artifactId>
-      <version>${parquet.format.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-server</artifactId>
-      <version>${hbase.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-client</artifactId>
-      <version>${hbase.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-buffer</artifactId>
-    </dependency>
-  </dependencies>
-
   <profiles>
     <profile>
       <id>docs</id>
@@ -382,7 +101,7 @@
             <executions>
               <execution>
                 <id>dist</id>
-                <phase>package</phase>
+                <phase>prepare-package</phase>
                 <goals>
                   <goal>run</goal>
                 </goals>
@@ -405,12 +124,15 @@
                       echo
                       echo "Current directory `pwd`"
                       echo
-                      run rm -rf ${project.artifactId}-${project.version}
-                      run mkdir ${project.artifactId}-${project.version}
-                      run cd ${project.artifactId}-${project.version}
-                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      run rm -rf tajo-storage-${project.version}
+                      run mkdir tajo-storage-${project.version}
+                      run cd tajo-storage-${project.version}
+                      run cp -r ${basedir}/tajo-storage-common/target/tajo-storage-common-${project.version}*.jar .
+                      run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar .
+                      run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar .
+
                       echo
-                      echo "Tajo Storage dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}"
                       echo
                     </echo>
                     <exec executable="sh" dir="${project.build.directory}" failonerror="true">
@@ -430,11 +152,7 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-project-info-reports-plugin</artifactId>
-        <version>2.4</version>
-        <configuration>
-          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-        </configuration>
+        <artifactId>maven-surefire-report-plugin</artifactId>
       </plugin>
     </plugins>
   </reporting>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
deleted file mode 100644
index c5e96ac..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface Appender extends Closeable {
-
-  void init() throws IOException;
-
-  void addTuple(Tuple t) throws IOException;
-  
-  void flush() throws IOException;
-
-  long getEstimatedOutputSize() throws IOException;
-  
-  void close() throws IOException;
-
-  void enableStats();
-  
-  TableStats getStats();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
deleted file mode 100644
index b829f60..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.datum.Datum;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * The Comparator class for Tuples
- * 
- * @see Tuple
- */
-public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> {
-  private final Schema schema;
-  private final SortSpec [] sortSpecs;
-  private final int[] sortKeyIds;
-  private final boolean[] asc;
-  @SuppressWarnings("unused")
-  private final boolean[] nullFirsts;  
-
-  private Datum left;
-  private Datum right;
-  private int compVal;
-
-  /**
-   * @param schema The schema of input tuples
-   * @param sortKeys The description of sort keys
-   */
-  public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
-    Preconditions.checkArgument(sortKeys.length > 0, 
-        "At least one sort key must be specified.");
-
-    this.schema = schema;
-    this.sortSpecs = sortKeys;
-    this.sortKeyIds = new int[sortKeys.length];
-    this.asc = new boolean[sortKeys.length];
-    this.nullFirsts = new boolean[sortKeys.length];
-    for (int i = 0; i < sortKeys.length; i++) {
-      if (sortKeys[i].getSortKey().hasQualifier()) {
-        this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
-      } else {
-        this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
-      }
-          
-      this.asc[i] = sortKeys[i].isAscending();
-      this.nullFirsts[i]= sortKeys[i].isNullFirst();
-    }
-  }
-
-  public BaseTupleComparator(TupleComparatorProto proto) {
-    this.schema = new Schema(proto.getSchema());
-
-    this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
-    for (int i = 0; i < proto.getSortSpecsCount(); i++) {
-      sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
-    }
-
-    this.sortKeyIds = new int[proto.getCompSpecsCount()];
-    this.asc = new boolean[proto.getCompSpecsCount()];
-    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
-
-    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
-      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
-      sortKeyIds[i] = sortSepcProto.getColumnId();
-      asc[i] = sortSepcProto.getAscending();
-      nullFirsts[i] = sortSepcProto.getNullFirst();
-    }
-  }
-
-  public Schema getSchema() {
-    return schema;
-  }
-
-  public SortSpec [] getSortSpecs() {
-    return sortSpecs;
-  }
-
-  public int [] getSortKeyIds() {
-    return sortKeyIds;
-  }
-
-  @Override
-  public boolean isAscendingFirstKey() {
-    return this.asc[0];
-  }
-
-  @Override
-  public int compare(Tuple tuple1, Tuple tuple2) {
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      left = tuple1.get(sortKeyIds[i]);
-      right = tuple2.get(sortKeyIds[i]);
-
-      if (left.isNull() || right.isNull()) {
-        if (!left.equals(right)) {
-          if (left.isNull()) {
-            compVal = 1;
-          } else if (right.isNull()) {
-            compVal = -1;
-          }
-          if (nullFirsts[i]) {
-            if (compVal != 0) {
-              compVal *= -1;
-            }
-          }
-        } else {
-          compVal = 0;
-        }
-      } else {
-        if (asc[i]) {
-          compVal = left.compareTo(right);
-        } else {
-          compVal = right.compareTo(left);
-        }
-      }
-
-      if (compVal < 0 || compVal > 0) {
-        return compVal;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(sortKeyIds);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof BaseTupleComparator) {
-      BaseTupleComparator other = (BaseTupleComparator) obj;
-      if (sortKeyIds.length != other.sortKeyIds.length) {
-        return false;
-      }
-
-      for (int i = 0; i < sortKeyIds.length; i++) {
-        if (sortKeyIds[i] != other.sortKeyIds[i] ||
-            asc[i] != other.asc[i] ||
-            nullFirsts[i] != other.nullFirsts[i]) {
-          return false;
-        }
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public TupleComparatorProto getProto() {
-    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
-    builder.setSchema(schema.getProto());
-    for (int i = 0; i < sortSpecs.length; i++) {
-      builder.addSortSpecs(sortSpecs[i].getProto());
-    }
-
-    TupleComparatorSpecProto.Builder sortSpecBuilder;
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
-      sortSpecBuilder.setColumnId(sortKeyIds[i]);
-      sortSpecBuilder.setAscending(asc[i]);
-      sortSpecBuilder.setNullFirst(nullFirsts[i]);
-      builder.addCompSpecs(sortSpecBuilder);
-    }
-
-    return builder.build();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-
-    String prefix = "";
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
-        .append(",Asc=").append(asc[i])
-        .append(",NullFirst=").append(nullFirsts[i]);
-      prefix = " ,";
-    }
-    return sb.toString();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
deleted file mode 100644
index 00112e7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-@Deprecated
-public class BinarySerializerDeserializer implements SerializerDeserializer {
-
-  static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
-  @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
-      throws IOException {
-    byte[] bytes;
-    int length = 0;
-    if (datum == null || datum instanceof NullDatum) {
-      return 0;
-    }
-
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-      case BIT:
-      case CHAR:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case INT2:
-        length = writeShort(out, datum.asInt2());
-        break;
-      case INT4:
-        length = writeVLong(out, datum.asInt4());
-        break;
-      case INT8:
-        length = writeVLong(out, datum.asInt8());
-        break;
-      case FLOAT4:
-        length = writeFloat(out, datum.asFloat4());
-        break;
-      case FLOAT8:
-        length = writeDouble(out, datum.asFloat8());
-        break;
-      case TEXT: {
-        bytes = datum.asTextBytes();
-        length = datum.size();
-        if (length == 0) {
-          bytes = INVALID_UTF__SINGLE_BYTE;
-          length = INVALID_UTF__SINGLE_BYTE.length;
-        }
-        out.write(bytes, 0, bytes.length);
-        break;
-      }
-      case BLOB:
-      case INET4:
-      case INET6:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
-        bytes = protobufDatum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case NULL_TYPE:
-        break;
-      default:
-        throw new IOException("Does not support type");
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-    if (length == 0) return NullDatum.get();
-
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        datum = DatumFactory.createBool(bytes[offset]);
-        break;
-      case BIT:
-        datum = DatumFactory.createBit(bytes[offset]);
-        break;
-      case CHAR: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-        datum = DatumFactory.createChar(chars);
-        break;
-      }
-      case INT2:
-        datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
-        break;
-      case INT4:
-        datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
-        break;
-      case INT8:
-        datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
-        break;
-      case FLOAT4:
-        datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
-        break;
-      case FLOAT8:
-        datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
-        break;
-      case TEXT: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-
-        if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
-          datum = DatumFactory.createText(new byte[0]);
-        } else {
-          datum = DatumFactory.createText(chars);
-        }
-        break;
-      }
-      case PROTOBUF: {
-        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
-        Message.Builder builder = factory.newBuilder();
-        builder.mergeFrom(bytes, offset, length);
-        datum = factory.createDatum(builder);
-        break;
-      }
-      case INET4:
-        datum = DatumFactory.createInet4(bytes, offset, length);
-        break;
-      case BLOB:
-        datum = DatumFactory.createBlob(bytes, offset, length);
-        break;
-      default:
-        datum = NullDatum.get();
-    }
-    return datum;
-  }
-
-  private byte[] shortBytes = new byte[2];
-
-  public int writeShort(OutputStream out, short val) throws IOException {
-    shortBytes[0] = (byte) (val >> 8);
-    shortBytes[1] = (byte) val;
-    out.write(shortBytes, 0, 2);
-    return 2;
-  }
-
-  public float toFloat(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 4);
-
-    int val = ((bytes[offset] & 0x000000FF) << 24) +
-        ((bytes[offset + 1] & 0x000000FF) << 16) +
-        ((bytes[offset + 2] & 0x000000FF) << 8) +
-        (bytes[offset + 3] & 0x000000FF);
-    return Float.intBitsToFloat(val);
-  }
-
-  private byte[] floatBytes = new byte[4];
-
-  public int writeFloat(OutputStream out, float f) throws IOException {
-    int val = Float.floatToIntBits(f);
-
-    floatBytes[0] = (byte) (val >> 24);
-    floatBytes[1] = (byte) (val >> 16);
-    floatBytes[2] = (byte) (val >> 8);
-    floatBytes[3] = (byte) val;
-    out.write(floatBytes, 0, 4);
-    return floatBytes.length;
-  }
-
-  public double toDouble(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 8);
-    long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
-        ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
-        ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
-        ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
-        ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
-        ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
-        ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
-        (long) (bytes[offset + 7] & 0x00000000000000FF);
-    return Double.longBitsToDouble(val);
-  }
-
-  private byte[] doubleBytes = new byte[8];
-
-  public int writeDouble(OutputStream out, double d) throws IOException {
-    long val = Double.doubleToLongBits(d);
-
-    doubleBytes[0] = (byte) (val >> 56);
-    doubleBytes[1] = (byte) (val >> 48);
-    doubleBytes[2] = (byte) (val >> 40);
-    doubleBytes[3] = (byte) (val >> 32);
-    doubleBytes[4] = (byte) (val >> 24);
-    doubleBytes[5] = (byte) (val >> 16);
-    doubleBytes[6] = (byte) (val >> 8);
-    doubleBytes[7] = (byte) val;
-    out.write(doubleBytes, 0, 8);
-    return doubleBytes.length;
-  }
-
-  private byte[] vLongBytes = new byte[9];
-
-  public static int writeVLongToByteArray(byte[] bytes, int offset, long l) {
-    if (l >= -112 && l <= 127) {
-      bytes[offset] = (byte) l;
-      return 1;
-    }
-
-    int len = -112;
-    if (l < 0) {
-      l ^= -1L; // take one's complement'
-      len = -120;
-    }
-
-    long tmp = l;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
-    }
-
-    bytes[offset++] = (byte) len;
-    len = (len < -120) ? -(len + 120) : -(len + 112);
-
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
-    }
-    return 1 + len;
-  }
-
-  public int writeVLong(OutputStream out, long l) throws IOException {
-    int len = writeVLongToByteArray(vLongBytes, 0, l);
-    out.write(vLongBytes, 0, len);
-    return len;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
deleted file mode 100644
index 85c79fa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/* this class is PooledBuffer holder */
-public class BufferPool {
-
-  private static final PooledByteBufAllocator allocator;
-
-  private BufferPool() {
-  }
-
-  static {
-    //TODO we need determine the default params
-    allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
-
-    /* if you are finding memory leak, please enable this line */
-    //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
-  }
-
-  public static long maxDirectMemory() {
-    return PlatformDependent.maxDirectMemory();
-  }
-
-
-  public synchronized static ByteBuf directBuffer(int size) {
-    return allocator.directBuffer(size);
-  }
-
-  /**
-   *
-   * @param size the initial capacity
-   * @param max the max capacity
-   * @return allocated ByteBuf from pool
-   */
-  public static ByteBuf directBuffer(int size, int max) {
-    return allocator.directBuffer(size, max);
-  }
-
-  @InterfaceStability.Unstable
-  public static void forceRelease(ByteBuf buf) {
-    buf.release(buf.refCnt());
-  }
-
-  /**
-   * the ByteBuf will increase to writable size
-   * @param buf
-   * @param minWritableBytes required minimum writable size
-   */
-  public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
-    buf.ensureWritable(minWritableBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
deleted file mode 100644
index b1b6d65..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.io.IOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
-
-public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel {
-
-  ByteBufferReadable byteBufferReadable;
-  ReadableByteChannel channel;
-  InputStream inputStream;
-
-  public ByteBufInputChannel(InputStream inputStream) {
-    if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) {
-      this.byteBufferReadable = (ByteBufferReadable) inputStream;
-    } else {
-      this.channel = Channels.newChannel(inputStream);
-    }
-
-    this.inputStream = inputStream;
-  }
-
-  @Override
-  public long read(ByteBuffer[] dsts, int offset, int length) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long read(ByteBuffer[] dsts) {
-    return read(dsts, 0, dsts.length);
-  }
-
-  @Override
-  public int read(ByteBuffer dst) throws IOException {
-    if (byteBufferReadable != null) {
-      return byteBufferReadable.read(dst);
-    } else {
-      return channel.read(dst);
-    }
-  }
-
-  @Override
-  protected void implCloseChannel() throws IOException {
-    IOUtils.cleanup(null, channel, inputStream);
-  }
-
-  public int available() throws IOException {
-    return inputStream.available();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
deleted file mode 100644
index 1e2b0f3..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-public class CSVFile {
-
-  public static final byte LF = '\n';
-  public static int EOF = -1;
-
-  private static final Log LOG = LogFactory.getLog(CSVFile.class);
-
-  public static class CSVAppender extends FileAppender {
-    private final TableMeta meta;
-    private final Schema schema;
-    private final int columnNum;
-    private final FileSystem fs;
-    private FSDataOutputStream fos;
-    private DataOutputStream outputStream;
-    private CompressionOutputStream deflateFilter;
-    private char delimiter;
-    private TableStatistics stats = null;
-    private Compressor compressor;
-    private CompressionCodecFactory codecFactory;
-    private CompressionCodec codec;
-    private Path compressedPath;
-    private byte[] nullChars;
-    private int BUFFER_SIZE = 128 * 1024;
-    private int bufferedBytes = 0;
-    private long pos = 0;
-    private boolean isShuffle;
-
-    private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
-    private SerializerDeserializer serde;
-
-    public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
-                       final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
-      super(conf, taskAttemptId, schema, meta, workDir);
-      this.fs = workDir.getFileSystem(conf);
-      this.meta = meta;
-      this.schema = schema;
-      this.delimiter = StringEscapeUtils.unescapeJava(
-          this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
-
-      this.columnNum = schema.size();
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(
-          this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT));
-
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-    }
-
-    @Override
-    public void init() throws IOException {
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.getParent().toString());
-      }
-
-      //determine the intermediate file type
-      String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
-          TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
-      if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
-        isShuffle = true;
-      } else {
-        isShuffle = false;
-      }
-
-      if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
-        String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
-        codecFactory = new CompressionCodecFactory(conf);
-        codec = codecFactory.getCodecByClassName(codecName);
-        compressor =  CodecPool.getCompressor(codec);
-        if(compressor != null) compressor.reset();  //builtin gzip is null
-
-        String extension = codec.getDefaultExtension();
-        compressedPath = path.suffix(extension);
-
-        if (fs.exists(compressedPath)) {
-          throw new AlreadyExistsStorageException(compressedPath);
-        }
-
-        fos = fs.create(compressedPath);
-        deflateFilter = codec.createOutputStream(fos, compressor);
-        outputStream = new DataOutputStream(deflateFilter);
-
-      } else {
-        if (fs.exists(path)) {
-          throw new AlreadyExistsStorageException(path);
-        }
-        fos = fs.create(path);
-        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
-      }
-
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-      }
-
-      try {
-        //It will be remove, because we will add custom serde in textfile
-        String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
-            TextSerializerDeserializer.class.getName());
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
-      os.reset();
-      pos = fos.getPos();
-      bufferedBytes = 0;
-      super.init();
-    }
-
-
-    @Override
-    public void addTuple(Tuple tuple) throws IOException {
-      Datum datum;
-      int rowBytes = 0;
-
-      for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
-
-        if(columnNum - 1 > i){
-          os.write((byte) delimiter);
-          rowBytes += 1;
-        }
-        if (isShuffle) {
-          // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
-        }
-      }
-      os.write(LF);
-      rowBytes += 1;
-
-      pos += rowBytes;
-      bufferedBytes += rowBytes;
-      if(bufferedBytes > BUFFER_SIZE){
-        flushBuffer();
-      }
-      // Statistical section
-      if (enabledStats) {
-        stats.incrementRow();
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if(os.getLength() > 0) {
-        os.writeTo(outputStream);
-        os.reset();
-        bufferedBytes = 0;
-      }
-    }
-    @Override
-    public long getOffset() throws IOException {
-      return pos;
-    }
-
-    @Override
-    public void flush() throws IOException {
-      flushBuffer();
-      outputStream.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      try {
-        flush();
-
-        // Statistical section
-        if (enabledStats) {
-          stats.setNumBytes(getOffset());
-        }
-
-        if(deflateFilter != null) {
-          deflateFilter.finish();
-          deflateFilter.resetState();
-          deflateFilter = null;
-        }
-
-        os.close();
-      } finally {
-        IOUtils.cleanup(LOG, fos);
-        if (compressor != null) {
-          CodecPool.returnCompressor(compressor);
-          compressor = null;
-        }
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (enabledStats) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-
-    public boolean isCompress() {
-      return compressor != null;
-    }
-
-    public String getExtension() {
-      return codec != null ? codec.getDefaultExtension() : "";
-    }
-  }
-
-  public static class CSVScanner extends FileScanner implements SeekableScanner {
-    public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-      factory = new CompressionCodecFactory(conf);
-      codec = factory.getCodec(this.fragment.getPath());
-      if (codec == null || codec instanceof SplittableCompressionCodec) {
-        splittable = true;
-      }
-
-      //Delimiter
-      this.delimiter = StringEscapeUtils.unescapeJava(
-          meta.getOption(StorageConstants.TEXT_DELIMITER,
-          meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0);
-
-      String nullCharacters = StringEscapeUtils.unescapeJava(
-          meta.getOption(StorageConstants.TEXT_NULL,
-          meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT)));
-
-      if (StringUtils.isEmpty(nullCharacters)) {
-        nullChars = NullDatum.get().asTextBytes();
-      } else {
-        nullChars = nullCharacters.getBytes();
-      }
-    }
-
-    private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
-    private char delimiter;
-    private FileSystem fs;
-    private FSDataInputStream fis;
-    private InputStream is; //decompressd stream
-    private CompressionCodecFactory factory;
-    private CompressionCodec codec;
-    private Decompressor decompressor;
-    private Seekable filePosition;
-    private boolean splittable = false;
-    private long startOffset, end, pos;
-    private int currentIdx = 0, validIdx = 0, recordCount = 0;
-    private int[] targetColumnIndexes;
-    private boolean eof = false;
-    private final byte[] nullChars;
-    private SplitLineReader reader;
-    private ArrayList<Long> fileOffsets;
-    private ArrayList<Integer> rowLengthList;
-    private ArrayList<Integer> startOffsets;
-    private NonSyncByteArrayOutputStream buffer;
-    private SerializerDeserializer serde;
-
-    @Override
-    public void init() throws IOException {
-      fileOffsets = new ArrayList<Long>();
-      rowLengthList = new ArrayList<Integer>();
-      startOffsets = new ArrayList<Integer>();
-      buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
-
-      // FileFragment information
-      if(fs == null) {
-        fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
-      }
-      if(fis == null) fis = fs.open(fragment.getPath());
-
-      recordCount = 0;
-      pos = startOffset = fragment.getStartKey();
-      end = startOffset + fragment.getLength();
-
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-        if (codec instanceof SplittableCompressionCodec) {
-          SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-              fis, decompressor, startOffset, end,
-              SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
-          reader = new CompressedSplitLineReader(cIn, conf, null);
-          startOffset = cIn.getAdjustedStart();
-          end = cIn.getAdjustedEnd();
-          filePosition = cIn;
-          is = cIn;
-        } else {
-          is = new DataInputStream(codec.createInputStream(fis, decompressor));
-          reader = new SplitLineReader(is, null);
-          filePosition = fis;
-        }
-      } else {
-        fis.seek(startOffset);
-        filePosition = fis;
-        is = fis;
-        reader = new SplitLineReader(is, null);
-      }
-
-      if (targets == null) {
-        targets = schema.toArray();
-      }
-
-      targetColumnIndexes = new int[targets.length];
-      for (int i = 0; i < targets.length; i++) {
-        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
-      }
-
-      try {
-        //FIXME
-        String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
-            TextSerializerDeserializer.class.getName());
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        throw new IOException(e);
-      }
-
-      super.init();
-      Arrays.sort(targetColumnIndexes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
-            "," + fs.getFileStatus(fragment.getPath()).getLen());
-      }
-
-      if (startOffset != 0) {
-        pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
-      }
-      eof = false;
-      page();
-    }
-
-    private int maxBytesToConsume(long pos) {
-      return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
-    }
-
-    private long fragmentable() throws IOException {
-      return end - getFilePosition();
-    }
-
-    private long getFilePosition() throws IOException {
-      long retVal;
-      if (isCompress()) {
-        retVal = filePosition.getPos();
-      } else {
-        retVal = pos;
-      }
-      return retVal;
-    }
-
-    private void page() throws IOException {
-//      // Index initialization
-      currentIdx = 0;
-      validIdx = 0;
-      int currentBufferPos = 0;
-      int bufferedSize = 0;
-
-      buffer.reset();
-      startOffsets.clear();
-      rowLengthList.clear();
-      fileOffsets.clear();
-
-      if(eof) {
-        return;
-      }
-
-      while (DEFAULT_PAGE_SIZE >= bufferedSize){
-
-        int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
-
-        if(ret == 0){
-          break;
-        } else {
-          fileOffsets.add(pos);
-          pos += ret;
-          startOffsets.add(currentBufferPos);
-          currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
-          bufferedSize += ret;
-          validIdx++;
-          recordCount++;
-        }
-
-        if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
-          eof = true;
-          break;
-        }
-      }
-      if (tableStats != null) {
-        tableStats.setReadBytes(pos - startOffset);
-        tableStats.setNumRows(recordCount);
-      }
-    }
-
-    @Override
-    public float getProgress() {
-      try {
-        if(eof) {
-          return 1.0f;
-        }
-        long filePos = getFilePosition();
-        if (startOffset == filePos) {
-          return 0.0f;
-        } else {
-          long readBytes = filePos - startOffset;
-          long remainingBytes = Math.max(end - filePos, 0);
-          return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-        return 0.0f;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      try {
-        if (currentIdx == validIdx) {
-          if (eof) {
-            return null;
-          } else {
-            page();
-
-            if(currentIdx == validIdx){
-              return null;
-            }
-          }
-        }
-
-        long offset = -1;
-        if(!isCompress()){
-          offset = fileOffsets.get(currentIdx);
-        }
-
-        byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
-            rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
-        currentIdx++;
-        return new LazyTuple(schema, cells, offset, nullChars, serde);
-      } catch (Throwable t) {
-        LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
-        LOG.error("Tuple list current index: " + currentIdx, t);
-        throw new IOException(t);
-      }
-    }
-
-    private boolean isCompress() {
-      return codec != null;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-        decompressor = null;
-      }
-
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (tableStats != null) {
-          tableStats.setReadBytes(pos - startOffset);  //Actual Processed Bytes. (decompressed bytes + overhead)
-          tableStats.setNumRows(recordCount);
-        }
-
-        IOUtils.cleanup(LOG, reader, is, fis);
-        fs = null;
-        is = null;
-        fis = null;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("CSVScanner processed record:" + recordCount);
-        }
-      } finally {
-        if (decompressor != null) {
-          CodecPool.returnDecompressor(decompressor);
-          decompressor = null;
-        }
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return true;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public void seek(long offset) throws IOException {
-      if(isCompress()) throw new UnsupportedException();
-
-      int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
-
-      if (tupleIndex > -1) {
-        this.currentIdx = tupleIndex;
-      } else if (isSplittable() && end >= offset || startOffset <= offset) {
-        eof = false;
-        fis.seek(offset);
-        pos = offset;
-        reader.reset();
-        this.currentIdx = 0;
-        this.validIdx = 0;
-        // pageBuffer();
-      } else {
-        throw new IOException("invalid offset " +
-            " < start : " +  startOffset + " , " +
-            "  end : " + end + " , " +
-            "  filePos : " + filePosition.getPos() + " , " +
-            "  input offset : " + offset + " >");
-      }
-    }
-
-    @Override
-    public long getNextOffset() throws IOException {
-      if(isCompress()) throw new UnsupportedException();
-
-      if (this.currentIdx == this.validIdx) {
-        if (fragmentable() <= 0) {
-          return -1;
-        } else {
-          page();
-          if(currentIdx == validIdx) return -1;
-        }
-      }
-      return fileOffsets.get(currentIdx);
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return splittable;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
deleted file mode 100644
index 4f58e68..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * Line reader for compressed splits
- *
- * Reading records from a compressed split is tricky, as the
- * LineRecordReader is using the reported compressed input stream
- * position directly to determine when a split has ended.  In addition the
- * compressed input stream is usually faking the actual byte position, often
- * updating it only after the first compressed block after the split is
- * accessed.
- *
- * Depending upon where the last compressed block of the split ends relative
- * to the record delimiters it can be easy to accidentally drop the last
- * record or duplicate the last record between this split and the next.
- *
- * Split end scenarios:
- *
- * 1) Last block of split ends in the middle of a record
- *      Nothing special that needs to be done here, since the compressed input
- *      stream will report a position after the split end once the record
- *      is fully read.  The consumer of the next split will discard the
- *      partial record at the start of the split normally, and no data is lost
- *      or duplicated between the splits.
- *
- * 2) Last block of split ends in the middle of a delimiter
- *      The line reader will continue to consume bytes into the next block to
- *      locate the end of the delimiter.  If a custom delimiter is being used
- *      then the next record must be read by this split or it will be dropped.
- *      The consumer of the next split will not recognize the partial
- *      delimiter at the beginning of its split and will discard it along with
- *      the next record.
- *
- *      However for the default delimiter processing there is a special case
- *      because CR, LF, and CRLF are all valid record delimiters.  If the
- *      block ends with a CR then the reader must peek at the next byte to see
- *      if it is an LF and therefore part of the same record delimiter.
- *      Peeking at the next byte is an access to the next block and triggers
- *      the stream to report the end of the split.  There are two cases based
- *      on the next byte:
- *
- *      A) The next byte is LF
- *           The split needs to end after the current record is returned.  The
- *           consumer of the next split will discard the first record, which
- *           is degenerate since LF is itself a delimiter, and start consuming
- *           records after that byte.  If the current split tries to read
- *           another record then the record will be duplicated between splits.
- *
- *      B) The next byte is not LF
- *           The current record will be returned but the stream will report
- *           the split has ended due to the peek into the next block.  If the
- *           next record is not read then it will be lost, as the consumer of
- *           the next split will discard it before processing subsequent
- *           records.  Therefore the next record beyond the reported split end
- *           must be consumed by this split to avoid data loss.
- *
- * 3) Last block of split ends at the beginning of a delimiter
- *      This is equivalent to case 1, as the reader will consume bytes into
- *      the next block and trigger the end of the split.  No further records
- *      should be read as the consumer of the next split will discard the
- *      (degenerate) record at the beginning of its split.
- *
- * 4) Last block of split ends at the end of a delimiter
- *      Nothing special needs to be done here. The reader will not start
- *      examining the bytes into the next block until the next record is read,
- *      so the stream will not report the end of the split just yet.  Once the
- *      next record is read then the next block will be accessed and the
- *      stream will indicate the end of the split.  The consumer of the next
- *      split will correctly discard the first record of its split, and no
- *      data is lost or duplicated.
- *
- *      If the default delimiter is used and the block ends at a CR then this
- *      is treated as case 2 since the reader does not yet know without
- *      looking at subsequent bytes whether the delimiter has ended.
- *
- * NOTE: It is assumed that compressed input streams *never* return bytes from
- *       multiple compressed blocks from a single read.  Failure to do so will
- *       violate the buffering performed by this class, as it will access
- *       bytes into the next block after the split before returning all of the
- *       records from the previous block.
- */
-
-public class CompressedSplitLineReader extends SplitLineReader {
-  SplitCompressionInputStream scin;
-  private boolean usingCRLF;
-  private boolean needAdditionalRecord = false;
-  private boolean finished = false;
-
-  public CompressedSplitLineReader(SplitCompressionInputStream in,
-                                   Configuration conf,
-                                   byte[] recordDelimiterBytes)
-      throws IOException {
-    super(in, conf, recordDelimiterBytes);
-    scin = in;
-    usingCRLF = (recordDelimiterBytes == null);
-  }
-
-  @Override
-  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
-      throws IOException {
-    int bytesRead = in.read(buffer);
-
-    // If the split ended in the middle of a record delimiter then we need
-    // to read one additional record, as the consumer of the next split will
-    // not recognize the partial delimiter as a record.
-    // However if using the default delimiter and the next character is a
-    // linefeed then next split will treat it as a delimiter all by itself
-    // and the additional record read should not be performed.
-    if (inDelimiter && bytesRead > 0) {
-      if (usingCRLF) {
-        needAdditionalRecord = (buffer[0] != '\n');
-      } else {
-        needAdditionalRecord = true;
-      }
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
-      throws IOException {
-    int bytesRead = 0;
-    if (!finished) {
-      // only allow at most one more record to be read after the stream
-      // reports the split ended
-      if (scin.getPos() > scin.getAdjustedEnd()) {
-        finished = true;
-      }
-
-      bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
-      , int maxBytesToConsume) throws IOException {
-    int bytesRead = 0;
-    if (!finished) {
-      // only allow at most one more record to be read after the stream
-      // reports the split ended
-      if (scin.getPos() > scin.getAdjustedEnd()) {
-        finished = true;
-      }
-
-      bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
-    }
-    return bytesRead;
-  }
-
-  @Override
-  public boolean needAdditionalRecordAfterSplit() {
-    return !finished && needAdditionalRecord;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
deleted file mode 100644
index 8841a31..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DataLocation {
-  private String host;
-  private int volumeId;
-
-  public DataLocation(String host, int volumeId) {
-    this.host = host;
-    this.volumeId = volumeId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getVolumeId() {
-    return volumeId;
-  }
-
-  @Override
-  public String toString() {
-    return "DataLocation{" +
-        "host=" + host +
-        ", volumeId=" + volumeId +
-        '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
deleted file mode 100644
index 2396349..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
-	private int id;
-	private String name;
-	
-	private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
-	public DiskDeviceInfo(int id) {
-		this.id = id;
-	}
-	
-	public int getId() {
-		return id;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	@Override
-	public String toString() {
-		return id + "," + name;
-	}
-
-	public void addMountPath(DiskMountInfo diskMountInfo) {
-		mountInfos.add(diskMountInfo);
-	}
-
-	public List<DiskMountInfo> getMountInfos() {
-		return mountInfos;
-	}
-
-	public void setMountInfos(List<DiskMountInfo> mountInfos) {
-		this.mountInfos = mountInfos;
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
deleted file mode 100644
index 22f18ba..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class DiskInfo {
-	private int id;
-	private String partitionName;
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	public DiskInfo(int id, String partitionName) {
-		this.id = id;
-		this.partitionName = partitionName;
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	public void setId(int id) {
-		this.id = id;
-	}
-
-	public String getPartitionName() {
-		return partitionName;
-	}
-
-	public void setPartitionName(String partitionName) {
-		this.partitionName = partitionName;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
deleted file mode 100644
index aadb0e7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	private int deviceId;
-	
-	public DiskMountInfo(int deviceId, String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-
-	public int getDeviceId() {
-		return deviceId;
-	}
-
-  @Override
-  public boolean equals(Object obj){
-    if (!(obj instanceof DiskMountInfo)) return false;
-
-    if (compareTo((DiskMountInfo) obj) == 0) return true;
-    else return false;
-  }
-
-  @Override
-  public int hashCode(){
-    return Objects.hashCode(mountPath);
-  }
-
-	@Override
-	public int compareTo(DiskMountInfo other) {
-		String path1 = mountPath;
-		String path2 = other.mountPath;
-		
-		int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
-		int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-		
-		if(path1Depth > path2Depth) {
-			return -1;
-		} else if(path1Depth < path2Depth) {
-			return 1;
-		} else {
-			int path1Length = path1.length();
-			int path2Length = path2.length();
-			
-			if(path1Length < path2Length) {
-				return 1;
-			} else if(path1Length > path2Length) {
-				return -1;
-			} else {
-				return path1.compareTo(path2);
-			}
-		}
-	}
-}