You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:33 UTC

[18/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
deleted file mode 100644
index 85e5f41..0000000
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ /dev/null
@@ -1,180 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<configuration>
-  <property>
-    <name>tajo.storage.manager.maxReadBytes</name>
-    <value>8388608</value>
-    <description></description>
-  </property>
-
-  <property>
-    <name>tajo.storage.manager.concurrency.perDisk</name>
-    <value>1</value>
-    <description></description>
-  </property>
-
-  <!--- Registered Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
-  </property>
-
-  <!--- Fragment Class Configurations -->
-  <property>
-    <name>tajo.storage.fragment.textfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.csv.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
-  </property>
-
-  <!--- Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
-  </property>
-  
-  <!--- Appender Handler -->
-  <property>
-    <name>tajo.storage.appender-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.hfile.class</name>
-    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
deleted file mode 100644
index cf8a54e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpFileServer {
-  private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
-
-  private final InetSocketAddress addr;
-  private InetSocketAddress bindAddr;
-  private ServerBootstrap bootstrap = null;
-  private ChannelFactory factory = null;
-  private ChannelGroup channelGroup = null;
-
-  public HttpFileServer(final InetSocketAddress addr) {
-    this.addr = addr;
-    this.factory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-        2);
-
-    // Configure the server.
-    this.bootstrap = new ServerBootstrap(factory);
-    // Set up the event pipeline factory.
-    this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
-    this.channelGroup = new DefaultChannelGroup();
-  }
-
-  public HttpFileServer(String bindaddr) {
-    this(NetUtils.createSocketAddr(bindaddr));
-  }
-
-  public void start() {
-    // Bind and start to accept incoming connections.
-    Channel channel = bootstrap.bind(addr);
-    channelGroup.add(channel);    
-    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
-    LOG.info("HttpFileServer starts up ("
-        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
-        + ")");
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-
-  public void stop() {
-    ChannelGroupFuture future = channelGroup.close();
-    future.awaitUninterruptibly();
-    factory.releaseExternalResources();
-
-    LOG.info("HttpFileServer shutdown ("
-        + this.bindAddr.getAddress().getHostAddress() + ":"
-        + this.bindAddr.getPort() + ")");
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
deleted file mode 100644
index 6c77317..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.RandomAccessFile;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
-      return;
-    }
-
-    final String path = sanitizeUri(request.getUri());
-    if (path == null) {
-      sendError(ctx, FORBIDDEN);
-      return;
-    }
-
-    File file = new File(path);
-    if (file.isHidden() || !file.exists()) {
-      sendError(ctx, NOT_FOUND);
-      return;
-    }
-    if (!file.isFile()) {
-      sendError(ctx, FORBIDDEN);
-      return;
-    }
-
-    RandomAccessFile raf;
-    try {
-      raf = new RandomAccessFile(file, "r");
-    } catch (FileNotFoundException fnfe) {
-      sendError(ctx, NOT_FOUND);
-      return;
-    }
-    long fileLength = raf.length();
-
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-    setContentLength(response, fileLength);
-    setContentTypeHeader(response);
-
-    Channel ch = e.getChannel();
-
-    // Write the initial line and the header.
-    ch.write(response);
-
-    // Write the content.
-    ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
-      // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
-    } else {
-      // No encryption - use zero-copy.
-      final FileRegion region =
-          new DefaultFileRegion(raf.getChannel(), 0, fileLength);
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureProgressListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
-        }
-
-        public void operationProgressed(
-            ChannelFuture future, long amount, long current, long total) {
-          System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
-        }
-      });
-    }
-
-    // Decide whether to close the connection or not.
-    if (!isKeepAlive(request)) {
-      // Close the connection when the whole content is written out.
-      writeFuture.addListener(ChannelFutureListener.CLOSE);
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-      throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
-    if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
-      return;
-    }
-
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  private static String sanitizeUri(String uri) {
-    // Decode the path.
-    try {
-      uri = URLDecoder.decode(uri, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      try {
-        uri = URLDecoder.decode(uri, "ISO-8859-1");
-      } catch (UnsupportedEncodingException e1) {
-        throw new Error();
-      }
-    }
-
-    // Convert file separators.
-    uri = uri.replace('/', File.separatorChar);
-
-    // Simplistic dumb security check.
-    // You will have to do something serious in the production environment.
-    if (uri.contains(File.separator + '.') ||
-        uri.contains('.' + File.separator) ||
-        uri.startsWith(".") || uri.endsWith(".")) {
-      return null;
-    }
-
-    return uri;
-  }
-
-  private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n",
-        CharsetUtil.UTF_8));
-
-    // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-  }
-
-  /**
-   * Sets the content type header for the HTTP Response
-   *
-   * @param response
-   *            HTTP response
-   */
-  private static void setContentTypeHeader(HttpResponse response) {
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following lines if you want HTTPS
-    //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    //engine.setUseClientMode(false);
-    //pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
-    pipeline.addLast("handler", new HttpFileServerHandler());
-    return pipeline;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
deleted file mode 100644
index ea46fa7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestCompressionStorages {
-  private TajoConf conf;
-  private static String TEST_PATH = "target/test-data/TestCompressionStorages";
-
-  private StoreType storeType;
-  private Path testDir;
-  private FileSystem fs;
-
-  public TestCompressionStorages(StoreType type) throws IOException {
-    this.storeType = type;
-    conf = new TajoConf();
-
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][]{
-        {StoreType.CSV},
-        {StoreType.RCFILE},
-        {StoreType.SEQUENCEFILE},
-        {StoreType.TEXTFILE}
-    });
-  }
-
-  @Test
-  public void testDeflateCodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, DeflateCodec.class);
-  }
-
-  @Test
-  public void testGzipCodecCompressionData() throws IOException {
-    if (storeType == StoreType.RCFILE) {
-      if( ZlibFactory.isNativeZlibLoaded(conf)) {
-        storageCompressionTest(storeType, GzipCodec.class);
-      }
-    } else if (storeType == StoreType.SEQUENCEFILE) {
-      if( ZlibFactory.isNativeZlibLoaded(conf)) {
-        storageCompressionTest(storeType, GzipCodec.class);
-      }
-    } else {
-      storageCompressionTest(storeType, GzipCodec.class);
-    }
-  }
-
-  @Test
-  public void testSnappyCodecCompressionData() throws IOException {
-    if (SnappyCodec.isNativeCodeLoaded()) {
-      storageCompressionTest(storeType, SnappyCodec.class);
-    }
-  }
-
-  @Test
-  public void testLz4CodecCompressionData() throws IOException {
-    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
-    storageCompressionTest(storeType, Lz4Codec.class);
-  }
-
-  private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.FLOAT4);
-    schema.addColumn("name", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(storeType);
-    meta.putOption("compression.codec", codec.getCanonicalName());
-    meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
-    meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
-    meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
-
-    String fileName = "Compression_" + codec.getSimpleName();
-    Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.enableStats();
-
-    appender.init();
-
-    String extension = "";
-    if (appender instanceof CSVFile.CSVAppender) {
-      extension = ((CSVFile.CSVAppender) appender).getExtension();
-    } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
-      extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
-    }
-
-    int tupleNum = 100000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(3);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createFloat4((float) i));
-      vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-    tablePath = tablePath.suffix(extension);
-    FileStatus status = fs.getFileStatus(tablePath);
-    long fileLen = status.getLen();
-    FileFragment[] tablets = new FileFragment[1];
-    tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
-
-    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-
-    if (StoreType.CSV == storeType) {
-      if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
-        assertTrue(scanner.isSplittable());
-      } else {
-        assertFalse(scanner.isSplittable());
-      }
-    }
-    scanner.init();
-
-    if (storeType == StoreType.SEQUENCEFILE) {
-      assertTrue(scanner instanceof SequenceFileScanner);
-      Writable key = ((SequenceFileScanner) scanner).getKey();
-      assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
-    }
-
-    int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-    assertEquals(tupleNum, tupleCnt);
-    assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
-    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
deleted file mode 100644
index 17a8da7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
-import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
-  protected byte[] data = null;
-
-  private static String TEST_PATH = "target/test-data/TestFileSystem";
-  private TajoConf conf = null;
-  private FileStorageManager sm = null;
-  private FileSystem fs = null;
-  Path testDir;
-
-  public TestFileSystems(FileSystem fs) throws IOException {
-    conf = new TajoConf();
-
-    if(fs instanceof S3FileSystem){
-      conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
-      fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
-    }
-    this.fs = fs;
-    sm = StorageManager.getFileStorageManager(conf);
-    testDir = getTestDir(this.fs, TEST_PATH);
-  }
-
-  public Path getTestDir(FileSystem fs, String dir) throws IOException {
-    Path path = new Path(dir);
-    if(fs.exists(path))
-      fs.delete(path, true);
-
-    fs.mkdirs(path);
-
-    return fs.makeQualified(path);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
-    });
-  }
-
-  @Test
-  public void testBlockSplit() throws IOException {
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-    Tuple[] tuples = new Tuple[4];
-    for (int i = 0; i < tuples.length; i++) {
-      tuples[i] = new VTuple(3);
-      tuples[i]
-          .put(new Datum[] { DatumFactory.createInt4(i),
-              DatumFactory.createInt4(i + 32),
-              DatumFactory.createText("name" + i) });
-    }
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
-        "table.csv");
-    fs.mkdirs(path.getParent());
-
-    Appender appender = sm.getAppender(meta, schema, path);
-    appender.init();
-    for (Tuple t : tuples) {
-      appender.addTuple(t);
-    }
-    appender.close();
-    FileStatus fileStatus = fs.getFileStatus(path);
-
-    List<Fragment> splits = sm.getSplits("table", meta, schema, path);
-    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
-    assertEquals(splitSize, splits.size());
-
-    for (Fragment fragment : splits) {
-      assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
deleted file mode 100644
index 387fed5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFrameTuple {
-  private Tuple tuple1;
-  private Tuple tuple2;
-
-  @Before
-  public void setUp() throws Exception {
-    tuple1 = new VTuple(11);
-    tuple1.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar('9'),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1")
-    });
-    
-    tuple2 = new VTuple(11);
-    tuple2.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar('9'),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1")
-    });
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public final void testFrameTuple() {
-    Tuple frame = new FrameTuple(tuple1, tuple2);
-    assertEquals(22, frame.size());
-    for (int i = 0; i < 22; i++) {
-      assertTrue(frame.contains(i));
-    }
-    
-    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
-    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
deleted file mode 100644
index c6149f7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.BytesUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestLazyTuple {
-
-  Schema schema;
-  byte[][] textRow;
-  byte[] nullbytes;
-  SerializerDeserializer serde;
-
-  @Before
-  public void setUp() {
-    nullbytes = "\\N".getBytes();
-
-    schema = new Schema();
-    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
-    schema.addColumn("col2", TajoDataTypes.Type.BIT);
-    schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
-    schema.addColumn("col4", TajoDataTypes.Type.INT2);
-    schema.addColumn("col5", TajoDataTypes.Type.INT4);
-    schema.addColumn("col6", TajoDataTypes.Type.INT8);
-    schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
-    schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
-    schema.addColumn("col9", TajoDataTypes.Type.TEXT);
-    schema.addColumn("col10", TajoDataTypes.Type.BLOB);
-    schema.addColumn("col11", TajoDataTypes.Type.INET4);
-    schema.addColumn("col12", TajoDataTypes.Type.INT4);
-    schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
-
-    StringBuilder sb = new StringBuilder();
-    sb.append(DatumFactory.createBool(true)).append('|');
-    sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
-    sb.append(DatumFactory.createChar("str")).append('|');
-    sb.append(DatumFactory.createInt2((short) 17)).append('|');
-    sb.append(DatumFactory.createInt4(59)).append('|');
-    sb.append(DatumFactory.createInt8(23l)).append('|');
-    sb.append(DatumFactory.createFloat4(77.9f)).append('|');
-    sb.append(DatumFactory.createFloat8(271.9f)).append('|');
-    sb.append(DatumFactory.createText("str2")).append('|');
-    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
-    sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
-    sb.append(new String(nullbytes)).append('|');
-    sb.append(NullDatum.get());
-    textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
-    serde = new TextSerializerDeserializer();
-  }
-
-  @Test
-  public void testGetDatum() {
-
-    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
-    assertEquals(DatumFactory.createBool(true), t1.get(0));
-    assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
-    assertEquals(DatumFactory.createChar("str"), t1.get(2));
-    assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
-    assertEquals(DatumFactory.createInt4(59), t1.get(4));
-    assertEquals(DatumFactory.createInt8(23l), t1.get(5));
-    assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
-    assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
-    assertEquals(DatumFactory.createText("str2"), t1.get(8));
-    assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
-    assertEquals(NullDatum.get(), t1.get(11));
-    assertEquals(NullDatum.get(), t1.get(12));
-  }
-
-  @Test
-  public void testContain() {
-    int colNum = schema.size();
-
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(3, DatumFactory.createInt4(1));
-    t1.put(7, DatumFactory.createInt4(1));
-
-    assertTrue(t1.contains(0));
-    assertFalse(t1.contains(1));
-    assertFalse(t1.contains(2));
-    assertTrue(t1.contains(3));
-    assertFalse(t1.contains(4));
-    assertFalse(t1.contains(5));
-    assertFalse(t1.contains(6));
-    assertTrue(t1.contains(7));
-    assertFalse(t1.contains(8));
-    assertFalse(t1.contains(9));
-    assertFalse(t1.contains(10));
-    assertFalse(t1.contains(11));
-    assertFalse(t1.contains(12));
-  }
-
-  @Test
-  public void testPut() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    t1.put(0, DatumFactory.createText("str"));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(11, DatumFactory.createFloat4(0.76f));
-
-    assertTrue(t1.contains(0));
-    assertTrue(t1.contains(1));
-
-    assertEquals(t1.getText(0), "str");
-    assertEquals(t1.get(1).asInt4(), 2);
-    assertTrue(t1.get(11).asFloat4() == 0.76f);
-  }
-
-  @Test
-  public void testEquals() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-
-    t2.put(0, DatumFactory.createInt4(1));
-    t2.put(1, DatumFactory.createInt4(2));
-    t2.put(3, DatumFactory.createInt4(2));
-
-    assertEquals(t1, t2);
-
-    Tuple t3 = new VTuple(colNum);
-    t3.put(0, DatumFactory.createInt4(1));
-    t3.put(1, DatumFactory.createInt4(2));
-    t3.put(3, DatumFactory.createInt4(2));
-    assertEquals(t1, t3);
-    assertEquals(t2, t3);
-
-    LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
-    assertNotSame(t1, t4);
-  }
-
-  @Test
-  public void testHashCode() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-    t1.put(4, DatumFactory.createText("str"));
-
-    t2.put(0, DatumFactory.createInt4(1));
-    t2.put(1, DatumFactory.createInt4(2));
-    t2.put(3, DatumFactory.createInt4(2));
-    t2.put(4, DatumFactory.createText("str"));
-
-    assertEquals(t1.hashCode(), t2.hashCode());
-
-    Tuple t3 = new VTuple(colNum);
-    t3.put(0, DatumFactory.createInt4(1));
-    t3.put(1, DatumFactory.createInt4(2));
-    t3.put(3, DatumFactory.createInt4(2));
-    t3.put(4, DatumFactory.createText("str"));
-    assertEquals(t1.hashCode(), t3.hashCode());
-    assertEquals(t2.hashCode(), t3.hashCode());
-
-    Tuple t4 = new VTuple(5);
-    t4.put(0, DatumFactory.createInt4(1));
-    t4.put(1, DatumFactory.createInt4(2));
-    t4.put(4, DatumFactory.createInt4(2));
-
-    assertNotSame(t1.hashCode(), t4.hashCode());
-  }
-
-  @Test
-  public void testPutTuple() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(2, DatumFactory.createInt4(3));
-
-
-    Schema schema2 = new Schema();
-    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
-    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
-    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
-    t2.put(0, DatumFactory.createInt4(4));
-    t2.put(1, DatumFactory.createInt4(5));
-
-    t1.put(3, t2);
-
-    for (int i = 0; i < 5; i++) {
-      assertEquals(i + 1, t1.get(i).asInt4());
-    }
-  }
-
-  @Test
-  public void testInvalidNumber() {
-    byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
-    Schema schema = new Schema();
-    schema.addColumn("col1", TajoDataTypes.Type.INT2);
-    schema.addColumn("col2", TajoDataTypes.Type.INT4);
-    schema.addColumn("col3", TajoDataTypes.Type.INT8);
-    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
-    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
-
-    LazyTuple tuple = new LazyTuple(schema, bytes, 0);
-    assertEquals(bytes.length, tuple.size());
-
-    for (int i = 0; i < tuple.size(); i++){
-      assertEquals(NullDatum.get(), tuple.get(i));
-    }
-  }
-
-  @Test
-  public void testClone() throws CloneNotSupportedException {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(3, DatumFactory.createInt4(2));
-    t1.put(4, DatumFactory.createText("str"));
-
-    LazyTuple t2 = (LazyTuple) t1.clone();
-    assertNotSame(t1, t2);
-    assertEquals(t1, t2);
-
-    assertSame(t1.get(4), t2.get(4));
-
-    t1.clear();
-    assertFalse(t1.equals(t2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
deleted file mode 100644
index 1a4bdba..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.storage.text.DelimitedLineReader;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-public class TestLineReader {
-	private static String TEST_PATH = "target/test-data/TestLineReader";
-
-  @Test
-  public void testByteBufLineReader() throws IOException {
-    TajoConf conf = new TajoConf();
-    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    FileSystem fs = testDir.getFileSystem(conf);
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("comment", Type.TEXT);
-    schema.addColumn("comment2", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
-    Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
-        tablePath);
-    appender.enableStats();
-    appender.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
-      vTuple.put(3, NullDatum.get());
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    FileStatus status = fs.getFileStatus(tablePath);
-
-    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
-    assertEquals(status.getLen(), channel.available());
-    ByteBufLineReader reader = new ByteBufLineReader(channel);
-    assertEquals(status.getLen(), reader.available());
-
-    long totalRead = 0;
-    int i = 0;
-    AtomicInteger bytes = new AtomicInteger();
-    for(;;){
-      ByteBuf buf = reader.readLineBuf(bytes);
-      if(buf == null) break;
-
-      totalRead += bytes.get();
-      i++;
-    }
-    IOUtils.cleanup(null, reader, channel, fs);
-    assertEquals(tupleNum, i);
-    assertEquals(status.getLen(), totalRead);
-    assertEquals(status.getLen(), reader.readBytes());
-  }
-
-  @Test
-  public void testLineDelimitedReader() throws IOException {
-    TajoConf conf = new TajoConf();
-    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    FileSystem fs = testDir.getFileSystem(conf);
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("comment", Type.TEXT);
-    schema.addColumn("comment2", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
-    meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
-
-    Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
-        tablePath);
-    appender.enableStats();
-    appender.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    long splitOffset = 0;
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
-      vTuple.put(3, NullDatum.get());
-      appender.addTuple(vTuple);
-
-      if(i == (tupleNum / 2)){
-        splitOffset = appender.getOffset();
-      }
-    }
-    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
-    appender.close();
-
-    tablePath = tablePath.suffix(extension);
-    FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
-    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
-    assertTrue(reader.isCompressed());
-    assertFalse(reader.isReadable());
-    reader.init();
-    assertTrue(reader.isReadable());
-
-
-    int i = 0;
-    while(reader.isReadable()){
-      ByteBuf buf = reader.readLine();
-      if(buf == null) break;
-      i++;
-    }
-
-    IOUtils.cleanup(null, reader, fs);
-    assertEquals(tupleNum, i);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
deleted file mode 100644
index cc4aa51..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestMergeScanner {
-  private TajoConf conf;
-  StorageManager sm;
-  private static String TEST_PATH = "target/test-data/TestMergeScanner";
-
-  private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
-      "{\n" +
-      "  \"type\": \"record\",\n" +
-      "  \"namespace\": \"org.apache.tajo\",\n" +
-      "  \"name\": \"testMultipleFiles\",\n" +
-      "  \"fields\": [\n" +
-      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"file\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"name\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"age\", \"type\": \"long\" }\n" +
-      "  ]\n" +
-      "}\n";
-
-  private Path testDir;
-  private StoreType storeType;
-  private FileSystem fs;
-
-  public TestMergeScanner(StoreType storeType) {
-    this.storeType = storeType;
-  }
-
-  @Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        {StoreType.CSV},
-        {StoreType.RAW},
-        {StoreType.RCFILE},
-        {StoreType.PARQUET},
-        {StoreType.SEQUENCEFILE},
-        {StoreType.AVRO},
-        // RowFile requires Byte-buffer read support, so we omitted RowFile.
-        //{StoreType.ROWFILE},
-    });
-  }
-
-  @Before
-  public void setup() throws Exception {
-    conf = new TajoConf();
-    conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
-    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getFileStorageManager(conf, testDir);
-  }
-
-  @Test
-  public void testMultipleFiles() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("file", Type.TEXT);
-    schema.addColumn("name", Type.TEXT);
-    schema.addColumn("age", Type.INT8);
-
-    KeyValueSet options = new KeyValueSet();
-    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
-    if (storeType == StoreType.AVRO) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_MULTIPLE_FILES_AVRO_SCHEMA);
-    }
-
-    Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table1Path);
-    appender1.enableStats();
-    appender1.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for(int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createText("hyunsik"));
-      vTuple.put(2, DatumFactory.createText("jihoon"));
-      vTuple.put(3, DatumFactory.createInt8(25l));
-      appender1.addTuple(vTuple);
-    }
-    appender1.close();
-
-    TableStats stat1 = appender1.getStats();
-    if (stat1 != null) {
-      assertEquals(tupleNum, stat1.getNumRows().longValue());
-    }
-
-    Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table2Path);
-    appender2.enableStats();
-    appender2.init();
-
-    for(int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(4);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createText("hyunsik"));
-      vTuple.put(2, DatumFactory.createText("jihoon"));
-      vTuple.put(3, DatumFactory.createInt8(25l));
-      appender2.addTuple(vTuple);
-    }
-    appender2.close();
-
-    TableStats stat2 = appender2.getStats();
-    if (stat2 != null) {
-      assertEquals(tupleNum, stat2.getNumRows().longValue());
-    }
-
-
-    FileStatus status1 = fs.getFileStatus(table1Path);
-    FileStatus status2 = fs.getFileStatus(table2Path);
-    Fragment[] fragment = new Fragment[2];
-    fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
-    fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
-
-    Schema targetSchema = new Schema();
-    targetSchema.addColumn(schema.getColumn(0));
-    targetSchema.addColumn(schema.getColumn(2));
-
-    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
-    assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
-
-    scanner.init();
-    int totalCounts = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      totalCounts++;
-      if (isProjectableStorage(meta.getStoreType())) {
-        assertNotNull(tuple.get(0));
-        assertNull(tuple.get(1));
-        assertNotNull(tuple.get(2));
-        assertNull(tuple.get(3));
-      }
-    }
-    scanner.close();
-
-    assertEquals(tupleNum * 2, totalCounts);
-	}
-
-  private static boolean isProjectableStorage(StoreType type) {
-    switch (type) {
-      case RCFILE:
-      case PARQUET:
-      case SEQUENCEFILE:
-      case CSV:
-      case AVRO:
-        return true;
-      default:
-        return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
deleted file mode 100644
index 12ea551..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.text.FieldSplitProcessor;
-import org.apache.tajo.storage.text.LineSplitProcessor;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static io.netty.util.ReferenceCountUtil.releaseLater;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestSplitProcessor {
-
-  @Test
-  public void testFieldSplitProcessor() throws IOException {
-    String data = "abc||de";
-    final ByteBuf buf = releaseLater(
-        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
-    final int len = buf.readableBytes();
-    FieldSplitProcessor processor = new FieldSplitProcessor('|');
-
-    assertEquals(3, buf.forEachByte(0, len, processor));
-    assertEquals(4, buf.forEachByte(4, len - 4, processor));
-    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
-
-  }
-
-  @Test
-  public void testLineSplitProcessor() throws IOException {
-    String data = "abc\r\n\n";
-    final ByteBuf buf = releaseLater(
-        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
-    final int len = buf.readableBytes();
-    LineSplitProcessor processor = new LineSplitProcessor();
-
-    //find CR
-    assertEquals(3, buf.forEachByte(0, len, processor));
-
-    // find CRLF
-    assertEquals(4, buf.forEachByte(4, len - 4, processor));
-    assertEquals(buf.getByte(4), '\n');
-    // need to skip LF
-    assertTrue(processor.isPrevCharCR());
-
-    // find LF
-    assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
deleted file mode 100644
index 13aeef6..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class TestStorageManager {
-	private TajoConf conf;
-	private static String TEST_PATH = "target/test-data/TestStorageManager";
-  StorageManager sm = null;
-  private Path testDir;
-  private FileSystem fs;
-
-	@Before
-	public void setUp() throws Exception {
-		conf = new TajoConf();
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getFileStorageManager(conf, testDir);
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-  @Test
-	public final void testGetScannerAndAppender() throws IOException {
-		Schema schema = new Schema();
-		schema.addColumn("id", Type.INT4);
-		schema.addColumn("age",Type.INT4);
-		schema.addColumn("name",Type.TEXT);
-
-		TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-		
-		Tuple[] tuples = new Tuple[4];
-		for(int i=0; i < tuples.length; i++) {
-		  tuples[i] = new VTuple(3);
-		  tuples[i].put(new Datum[] {
-          DatumFactory.createInt4(i),
-		      DatumFactory.createInt4(i + 32),
-		      DatumFactory.createText("name" + i)});
-		}
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
-    fs.mkdirs(path.getParent());
-		Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, path);
-    appender.init();
-		for(Tuple t : tuples) {
-		  appender.addTuple(t);
-		}
-		appender.close();
-
-		Scanner scanner = StorageManager.getFileStorageManager(conf).getFileScanner(meta, schema, path);
-    scanner.init();
-		int i=0;
-		while(scanner.next() != null) {
-			i++;
-		}
-		assertEquals(4,i);
-	}
-
-  @Test
-  public void testGetSplit() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
-    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(1).build();
-
-    int testCount = 10;
-    Path tablePath = new Path("/testGetSplit");
-    try {
-      DistributedFileSystem fs = cluster.getFileSystem();
-
-      // Create test partitions
-      List<Path> partitions = Lists.newArrayList();
-      for (int i =0; i < testCount; i++){
-        Path tmpFile = new Path(tablePath, String.valueOf(i));
-        DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
-        partitions.add(tmpFile);
-      }
-
-      assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
-
-      Schema schema = new Schema();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("age",Type.INT4);
-      schema.addColumn("name",Type.TEXT);
-      TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-      List<Fragment> splits = Lists.newArrayList();
-      // Get FileFragments in partition batch
-      splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
-      assertEquals(testCount, splits.size());
-      // -1 is unknown volumeId
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-
-      splits.clear();
-      splits.addAll(sm.getSplits("data", meta, schema,
-          partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
-      assertEquals(testCount / 2, splits.size());
-      assertEquals(1, splits.get(0).getHosts().length);
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-      fs.close();
-    } finally {
-      cluster.shutdown();
-
-      File dir = new File(testDataPath);
-      dir.delete();
-    }
-  }
-
-  @Test
-  public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
-    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2).build();
-
-    int testCount = 10;
-    Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
-    try {
-      DistributedFileSystem fs = cluster.getFileSystem();
-
-      // Create test files
-      for (int i = 0; i < testCount; i++) {
-        Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
-        DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
-      }
-      assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
-
-      Schema schema = new Schema();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("age", Type.INT4);
-      schema.addColumn("name", Type.TEXT);
-      TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-      List<Fragment> splits = Lists.newArrayList();
-      splits.addAll(sm.getSplits("data", meta, schema, tablePath));
-
-      assertEquals(testCount, splits.size());
-      assertEquals(2, splits.get(0).getHosts().length);
-      assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
-      assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-      fs.close();
-    } finally {
-      cluster.shutdown();
-
-      File dir = new File(testDataPath);
-      dir.delete();
-    }
-  }
-}