You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2015/12/03 04:34:40 UTC

tajo git commit: TAJO-1960: Separate HDFS space handler and S3 space handler.

Repository: tajo
Updated Branches:
  refs/heads/master f2552bfb9 -> 3852ae3d0


TAJO-1960: Separate HDFS space handler and S3 space handler.

Closes #868


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3852ae3d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3852ae3d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3852ae3d

Branch: refs/heads/master
Commit: 3852ae3d0d3fcb30ebff73f087ce34f3e1fc5bfb
Parents: f2552bf
Author: JaeHwa Jung <bl...@apache.org>
Authored: Thu Dec 3 12:29:09 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Thu Dec 3 12:29:09 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 tajo-storage/pom.xml                            |    2 +
 .../src/main/resources/storage-default.json     |    8 +-
 tajo-storage/tajo-storage-s3/pom.xml            |  233 ++++
 .../apache/tajo/storage/s3/S3TableSpace.java    | 1164 ++++++++++++++++++
 .../tajo/storage/s3/MockS3FileSystem.java       |  110 ++
 .../tajo/storage/s3/TestS3TableSpace.java       |   62 +
 7 files changed, 1577 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e0fd693..688c2f5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -135,6 +135,8 @@ Release 0.12.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1960: Separate HDFS space handler and S3 space handler. (jaehwa)
+
     TAJO-1856: Add a description about the relationship of tablespace, managed table, 
     and external table to Tablespace section of Table Management chapter. 
     (Contributed by Jongyoung Park. Committed by jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index a6b9b8a..4881e2c 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -39,6 +39,7 @@
     <module>tajo-storage-hbase</module>
     <module>tajo-storage-jdbc</module>
     <module>tajo-storage-pgsql</module>
+    <module>tajo-storage-s3</module>
   </modules>
 
   <build>
@@ -135,6 +136,7 @@
                       run cp -r ${basedir}/tajo-storage-common/target/tajo-storage-common-${project.version}*.jar .
                       run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar .
                       run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar .
+                      run cp -r ${basedir}/tajo-storage-s3/target/tajo-storage-s3-${project.version}*.jar .
 
                       echo
                       echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}"

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
index a24f301..17ac3ba 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
@@ -17,19 +17,19 @@
       "default-format": "rowstore"
     },
     "s3": {
-      "handler": "org.apache.tajo.storage.FileTablespace",
+      "handler": "org.apache.tajo.storage.s3.S3TableSpace",
       "default-format": "text"
     },
     "s3a": {
-      "handler": "org.apache.tajo.storage.FileTablespace",
+      "handler": "org.apache.tajo.storage.s3.S3TableSpace",
       "default-format": "text"
     },
     "s3n": {
-      "handler": "org.apache.tajo.storage.FileTablespace",
+      "handler": "org.apache.tajo.storage.s3.S3TableSpace",
       "default-format": "text"
     },
     "swift": {
-      "handler": "org.apache.tajo.storage.FileTablespace",
+      "handler": "org.apache.tajo.storage.s3.S3TableSpace",
       "default-format": "text"
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml
new file mode 100644
index 0000000..a9a541a
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/pom.xml
@@ -0,0 +1,233 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-s3</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo S3 storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/dataset/**</exclude>
+            <exclude>src/test/resources/queries/**</exclude>
+            <exclude>src/test/resources/results/**</exclude>
+          </excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-api</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-json</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <!-- Run unit tests in tajo-storage-s3, whereas it is disabled as by default. -->
+      <id>test-storage-s3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration combine.self="override">
+              <systemProperties>
+                <tajo.test.enabled>TRUE</tajo.test.enabled>
+              </systemProperties>
+              <argLine>-Xms128m -Xmx1024m -Dfile.encoding=UTF-8</argLine>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
new file mode 100644
index 0000000..7ac5425
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
@@ -0,0 +1,1164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import net.minidev.json.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.Bytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.*;
+
+public class S3TableSpace extends Tablespace {
+
+  public static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+  private final Log LOG = LogFactory.getLog(S3TableSpace.class);
+
+  static final String OUTPUT_FILE_PREFIX="part-";
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(2);
+        return fmt;
+      }
+    };
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(6);
+        return fmt;
+      }
+    };
+
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(3);
+        return fmt;
+      }
+    };
+
+  private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false);
+  private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true);
+
+  protected FileSystem fs;
+  protected Path spacePath;
+  protected Path stagingRootPath;
+  protected boolean blocksMetadataEnabled;
+  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+
+  public S3TableSpace(String spaceName, URI uri, JSONObject config) {
+    super(spaceName, uri, config);
+  }
+
+  @Override
+  protected void storageInit() throws IOException {
+    this.spacePath = new Path(uri);
+    this.fs = spacePath.getFileSystem(conf);
+    this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR)));
+    this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
+
+    this.blocksMetadataEnabled =
+      conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+    if (!this.blocksMetadataEnabled) {
+      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+    }
+  }
+
+  @Override
+  public long getTableVolume(URI uri) throws UnsupportedException {
+    Path path = new Path(uri);
+    ContentSummary summary;
+    try {
+      summary = fs.getContentSummary(path);
+    } catch (IOException e) {
+      throw new TajoInternalError(e);
+    }
+    return summary.getLength();
+  }
+
+  @Override
+  public URI getRootUri() {
+    return fs.getUri();
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+    throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    return getFileScanner(meta, schema, path, status);
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
+    throws IOException {
+    Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+    return getScanner(meta, schema, fragment, null);
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public void delete(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    fs.delete(tablePath, true);
+  }
+
+  public boolean exists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    return fileSystem.exists(path);
+  }
+
+  @Override
+  public URI getTableUri(String databaseName, String tableName) {
+    return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri();
+  }
+
+  @VisibleForTesting
+  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
+    throws IOException {
+    return getAppender(null, null, meta, schema, filePath);
+  }
+
+  public FileFragment[] split(String tableName) throws IOException {
+    Path tablePath = new Path(spacePath, tableName);
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
+    Path tablePath = new Path(spacePath, tableName);
+    return split(tableName, tablePath, fragmentSize);
+  }
+
+  public FileFragment[] split(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  private FileFragment[] split(String tableName, Path tablePath, long size)
+    throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+                                       Path tablePath, long size)
+    throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public long calculateSize(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    long totalSize = 0;
+
+    if (fs.exists(tablePath)) {
+      totalSize = fs.getContentSummary(tablePath).getLength();
+    }
+
+    return totalSize;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // FileInputFormat Area
+  /////////////////////////////////////////////////////////////////////////////
+  public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
+    if (taskAttemptId == null) {
+      // For testcase
+      return workDir;
+    }
+    // The final result of a task will be written in a file named part-ss-nnnnnnn,
+    // where ss is the stage id associated with this task, and nnnnnn is the task id.
+    Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
+      OUTPUT_FILE_PREFIX +
+        OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" +
+        OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
+        OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+    LOG.info("Output File Path: " + outFilePath);
+
+    return outFilePath;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression.
+   *
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<>();
+    filters.add(hiddenFileFilter);
+
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (int i = 0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        LOG.warn("Input path does not exist: " + p);
+      } else if (matches.length == 0) {
+        LOG.warn("Input Pattern " + p + " matches 0 files");
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDirectory()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * <p/>
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that Mappers process entire files.
+   *
+   *
+   * @param path the file name to check
+   * @param status get the file length
+   * @return is this file isSplittable?
+   */
+  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
+    Scanner scanner = getFileScanner(meta, schema, path, status);
+    boolean split = scanner.isSplittable();
+    scanner.close();
+    return split;
+  }
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  protected int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+        (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+      " is outside of file (0.." +
+      fileLength + ")");
+  }
+
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
+    return new FileFragment(fragmentId, file, start, length);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
+                                   String[] hosts) {
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
+    throws IOException {
+    return new FileFragment(fragmentId, file, blockLocation);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
+                                      BlockLocation[] blkLocations) throws IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+    }
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  /**
+   * Get the minimum split size
+   *
+   * @return the minimum number of bytes that can be in a split
+   */
+  public long getMinSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
+  }
+
+  /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
+        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the list of files and make them into FileSplits.
+   *
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+    throws IOException {
+    // generate splits'
+
+    List<Fragment> splits = Lists.newArrayList();
+    List<Fragment> volumeSplits = Lists.newArrayList();
+    List<BlockLocation> blockLocations = Lists.newArrayList();
+
+    for (Path p : inputs) {
+      ArrayList<FileStatus> files = Lists.newArrayList();
+      if (fs.isFile(p)) {
+        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+      } else {
+        files.addAll(listStatus(p));
+      }
+
+      int previousSplitSize = splits.size();
+      for (FileStatus file : files) {
+        Path path = file.getPath();
+        long length = file.getLen();
+        if (length > 0) {
+          // Get locations of blocks of file
+          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+          boolean splittable = isSplittable(meta, schema, path, file);
+          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+            if (splittable) {
+              for (BlockLocation blockLocation : blkLocations) {
+                volumeSplits.add(makeSplit(tableName, path, blockLocation));
+              }
+              blockLocations.addAll(Arrays.asList(blkLocations));
+
+            } else { // Non splittable
+              long blockSize = blkLocations[0].getLength();
+              if (blockSize >= length) {
+                blockLocations.addAll(Arrays.asList(blkLocations));
+                for (BlockLocation blockLocation : blkLocations) {
+                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
+                }
+              } else {
+                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+              }
+            }
+
+          } else {
+            if (splittable) {
+
+              long minSize = Math.max(getMinSplitSize(), 1);
+
+              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
+              long splitSize = Math.max(minSize, blockSize);
+              long bytesRemaining = length;
+
+              // for s3
+              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
+                  blkLocations[blkIndex].getHosts()));
+                bytesRemaining -= splitSize;
+              }
+              if (bytesRemaining > 0) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
+                  blkLocations[blkIndex].getHosts()));
+              }
+            } else { // Non splittable
+              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+            }
+          }
+        }
+      }
+      if(LOG.isDebugEnabled()){
+        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
+      }
+    }
+
+    // Combine original fileFragments with new VolumeId information
+    setVolumeMeta(volumeSplits, blockLocations);
+    splits.addAll(volumeSplits);
+    LOG.info("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
+    throws IOException {
+
+    int locationSize = blockLocations.size();
+    int splitSize = splits.size();
+    if (locationSize == 0 || splitSize == 0) return;
+
+    if (locationSize != splitSize) {
+      // splits and locations don't match up
+      LOG.warn("Number of block locations not equal to number of splits: "
+        + "#locations=" + locationSize
+        + " #splits=" + splitSize);
+      return;
+    }
+
+    DistributedFileSystem fs = (DistributedFileSystem) this.fs;
+    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    int blockLocationIdx = 0;
+
+    Iterator<Fragment> iter = splits.iterator();
+    while (locationSize > blockLocationIdx) {
+
+      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
+      //BlockStorageLocation containing additional volume location information for each replica of each block.
+      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
+
+      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+        ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+        blockLocationIdx++;
+      }
+    }
+    LOG.info("# of splits with volumeId " + splitSize);
+  }
+
+  private static class InvalidInputException extends IOException {
+    List<IOException> errors;
+    public InvalidInputException(List<IOException> errors) {
+      this.errors = errors;
+    }
+
+    @Override
+    public String getMessage(){
+      StringBuffer sb = new StringBuffer();
+      int messageLimit = Math.min(errors.size(), 10);
+      for (int i = 0; i < messageLimit ; i ++) {
+        sb.append(errors.get(i).getMessage()).append("\n");
+      }
+
+      if(messageLimit < errors.size())
+        sb.append("skipped .....").append("\n");
+
+      return sb.toString();
+    }
+  }
+
+  @Override
+  public List<Fragment> getSplits(String inputSourceId,
+                                  TableDesc table,
+                                  @Nullable EvalNode filterCondition) throws IOException {
+    return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri()));
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
+    if (!tableDesc.isExternal()) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+      String databaseName = splitted[0];
+      String simpleTableName = splitted[1];
+
+      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+      Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName);
+      tableDesc.setUri(tablePath.toUri());
+    } else {
+      Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
+    }
+
+    Path path = new Path(tableDesc.getUri());
+
+    FileSystem fs = path.getFileSystem(conf);
+    TableStats stats = new TableStats();
+    if (tableDesc.isExternal()) {
+      if (!fs.exists(path)) {
+        LOG.error(path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
+    }
+
+    long totalSize = 0;
+
+    try {
+      totalSize = calculateSize(path);
+    } catch (IOException e) {
+      LOG.warn("Cannot calculate the size of the relation", e);
+    }
+
+    stats.setNumBytes(totalSize);
+
+    if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
+      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    }
+
+    tableDesc.setStats(stats);
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException {
+    try {
+      Path path = new Path(tableDesc.getUri());
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Delete table data dir: " + path);
+      fs.delete(path, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+  }
+
+  @Override
+  public StorageProperty getProperty() {
+    return FileStorageProperties;
+  }
+
+  @Override
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    return GeneralFileProperties;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void prepareTable(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public void rollbackTable(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+    Path stagingDir;
+    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+    // So, this query results won't be materialized as a part of a table.
+    // The result will be temporarily written in the staging directory.
+    if (outputPath.isEmpty()) {
+      // for temporarily written in the storage directory
+      stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+    } else {
+      Tablespace space = TablespaceManager.get(outputPath);
+      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+        // If this space allows move operation, the staging directory will be underneath the final output table uri.
+        stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId));
+      } else {
+        stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+      }
+    }
+
+    return stagingDir.toUri();
+  }
+
+  // query submission directory is private!
+  final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx--------
+  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta)
+    throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+
+    Path stagingDir = new Path(getStagingUri(context, queryId, meta));
+
+    ////////////////////////////////////////////
+    // Create Output Directory
+    ////////////////////////////////////////////
+
+    if (fs.exists(stagingDir)) {
+      throw new IOException("The staging directory '" + stagingDir + "' already exists");
+    }
+    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    FileStatus fsStatus = fs.getFileStatus(stagingDir);
+    String owner = fsStatus.getOwner();
+
+    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+      throw new IOException("The ownership on the user's query " +
+        "directory " + stagingDir + " is not as expected. " +
+        "It is owned by " + owner + ". The directory must " +
+        "be owned by the submitter " + currentUser + " or " +
+        "by " + realUser);
+    }
+
+    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+      LOG.info("Permissions on staging directory " + stagingDir + " are " +
+        "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+        "to correct value " + STAGING_DIR_PERMISSION);
+      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    }
+
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
+    return stagingDir.toUri();
+  }
+
+  @Override
+  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) {
+  }
+
+  @Override
+  public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
+                          Schema schema, TableDesc tableDesc) throws IOException {
+    return commitOutputData(queryContext, true);
+  }
+
+  @Override
+  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+                                          Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
+    throws IOException {
+    return null;
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the staging directory.
+   *
+   * @param queryContext The query property
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI));
+      try {
+        FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
+
+          // It moves the original table into the temporary location.
+          // Then it moves the new result table into the original table location.
+          // Upon failed, it recovers the original table if possible.
+          boolean movedToOldTable = false;
+          boolean committed = false;
+          Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+          ContentSummary summary = fs.getContentSummary(stagingResultDir);
+
+          // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not.
+          boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED);
+
+          // If existing data doesn't need to keep, check if there are some files.
+          if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty())
+            && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) {
+            // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
+            // renaming directory.
+            Map<Path, Path> renameDirs = new HashMap<>();
+            // This is a map for recovering existing partition directory. A key is current directory and a value is
+            // temporary directory to back up.
+            Map<Path, Path> recoveryDirs = new HashMap<>();
+
+            try {
+              if (!fs.exists(finalOutputDir)) {
+                fs.mkdirs(finalOutputDir);
+              }
+
+              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
+                renameDirs, oldTableDir);
+
+              // Rename target partition directories
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                // Backup existing data files for recovering
+                if (fs.exists(entry.getValue())) {
+                  String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                    oldTableDir.toString());
+                  Path recoveryPath = new Path(recoveryPathString);
+                  fs.rename(entry.getValue(), recoveryPath);
+                  fs.exists(recoveryPath);
+                  recoveryDirs.put(entry.getValue(), recoveryPath);
+                }
+                // Delete existing directory
+                fs.delete(entry.getValue(), true);
+                // Rename staging directory to final output directory
+                fs.rename(entry.getKey(), entry.getValue());
+              }
+
+            } catch (IOException ioe) {
+              // Remove created dirs
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+              }
+
+              // Recovery renamed dirs
+              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+                fs.rename(entry.getValue(), entry.getKey());
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          } else { // no partition
+            try {
+
+              // if the final output dir exists, move all contents to the temporary table dir.
+              // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
+              if (fs.exists(finalOutputDir)) {
+                fs.mkdirs(oldTableDir);
+
+                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
+                  fs.rename(status.getPath(), oldTableDir);
+                }
+
+                movedToOldTable = fs.exists(oldTableDir);
+              } else { // if the parent does not exist, make its parent directory.
+                fs.mkdirs(finalOutputDir);
+              }
+
+              // Move the results to the final output dir.
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+
+              // Check the final output dir
+              committed = fs.exists(finalOutputDir);
+
+            } catch (IOException ioe) {
+              // recover the old table
+              if (movedToOldTable && !committed) {
+
+                // if commit is failed, recover the old data
+                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
+                  fs.delete(status.getPath(), true);
+                }
+
+                for (FileStatus status : fs.listStatus(oldTableDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          }
+        } else {
+          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
+
+            NumberFormat fmt = NumberFormat.getInstance();
+            fmt.setGroupingUsed(false);
+            fmt.setMinimumIntegerDigits(3);
+
+            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.isFile()) {
+                  LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
+              }
+            } else {
+              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.getPath().getName().startsWith("_")) {
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
+              }
+            }
+            // checking all file moved and remove empty dir
+            verifyAllFileMoved(fs, stagingResultDir);
+            FileStatus[] files = fs.listStatus(stagingResultDir);
+            if (files != null && files.length != 0) {
+              for (FileStatus eachFile: files) {
+                LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+              }
+            }
+          } else { // CREATE TABLE AS SELECT (CTAS)
+            if (fs.exists(finalOutputDir)) {
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+            } else {
+              fs.rename(stagingResultDir, finalOutputDir);
+            }
+            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+          }
+        }
+
+        // remove the staging directory if the final output dir is given.
+        Path stagingDirRoot = stagingDir.getParent();
+        fs.delete(stagingDirRoot, true);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    }
+
+    return finalOutputDir;
+  }
+
+  /**
+   * Attach the sequence number to the output file name and than move the file into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws java.io.IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+            "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
+            "to final output[" + finalSubPath + "]");
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
+  }
+
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws java.io.IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
+  }
+
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws java.io.IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method sets a rename map which includes renamed staging directory to final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws java.io.IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
+          oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
+          renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
+          outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
new file mode 100644
index 0000000..15c6a33
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.s3;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class MockS3FileSystem extends FileSystem {
+  private URI uri;
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    super.initialize(uri, conf);
+
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+  }
+
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   * <p/>
+   *
+   * @return <code>s3</code>
+   */
+  @Override
+  public String getScheme() {
+    return "s3";
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public Path makeQualified(Path path) {
+    return path;
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return null;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+    short replication, long blockSize, Progressable progress) throws IOException {
+    return null;
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    return null;
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    return false;
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return false;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    return new FileStatus[0];
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return null;
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return false;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3852ae3d/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
new file mode 100644
index 0000000..2d06778
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import net.minidev.json.JSONObject;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.TablespaceManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestS3TableSpace {
+  public static final String SPACENAME = "s3_cluster";
+  public static final String S3_URI = "s3://tajo-test/";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    S3TableSpace tablespace = new S3TableSpace(SPACENAME, URI.create(S3_URI), new JSONObject());
+
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName());
+    tablespace.init(tajoConf);
+
+    TablespaceManager.addTableSpaceForTest(tablespace);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    TablespaceManager.removeTablespaceForTest(SPACENAME);
+  }
+
+  @Test
+  public void testTablespaceHandler() throws Exception {
+    assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace);
+    assertEquals(SPACENAME, (TablespaceManager.getByName(SPACENAME).getName()));
+
+    assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace);
+    assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString());
+  }
+}