You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2016/08/29 14:12:30 UTC

tajo git commit: TAJO-2069: Implement finding the total size of all objects in a bucket with AWS SDK.

Repository: tajo
Updated Branches:
  refs/heads/master 0ef485ba7 -> 4f35c28e8


TAJO-2069: Implement finding the total size of all objects in a bucket with AWS SDK.

Closes #1024


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4f35c28e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4f35c28e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4f35c28e

Branch: refs/heads/master
Commit: 4f35c28e8e0281beed0a457244fbb9144e791fee
Parents: 0ef485b
Author: JaeHwa Jung <bl...@apache.org>
Authored: Mon Aug 29 23:07:31 2016 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Mon Aug 29 23:07:31 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-project/pom.xml                            |   2 +
 .../org/apache/tajo/storage/FileTablespace.java |  13 +-
 tajo-storage/tajo-storage-s3/pom.xml            |  88 +--
 .../s3/AnonymousAWSCredentialsProvider.java     |  41 ++
 .../storage/s3/BasicAWSCredentialsProvider.java |  55 ++
 .../apache/tajo/storage/s3/S3TableSpace.java    | 211 +++++++
 .../apache/tajo/storage/s3/TajoS3Constants.java |  69 +++
 .../apache/tajo/storage/s3/MockAmazonS3.java    | 616 +++++++++++++++++++
 .../tajo/storage/s3/MockObjectListing.java      |  48 ++
 .../tajo/storage/s3/MockS3FileSystem.java       |  21 +-
 .../tajo/storage/s3/TestS3TableSpace.java       |  91 ++-
 12 files changed, 1166 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 536c728..f6c5458 100644
--- a/CHANGES
+++ b/CHANGES
@@ -413,6 +413,9 @@ Release 0.12.0 - unreleased
 
   SUB TASKS
 
+    TAJO-2069: Implement finding the total size of all objects in a bucket
+    with AWS SDK. (jaehwa)
+
     TAJO-1488: Implement KafkaTablespace. (Byunghwa Yun via jinho)
 
     TAJO-1487: Kafka Scanner for kafka strage. (Byunghwa Yun via jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index e6a018c..c259c7a 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -41,6 +41,8 @@
     <jetty.version>6.1.26</jetty.version>
     <parquet.version>1.8.1</parquet.version>
     <kafka.version>0.10.0.1</kafka.version>
+    <aws-java-sdk.version>1.7.4</aws-java-sdk.version>
+    <httpclient.version>4.5</httpclient.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
   </properties>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2785de4..dd7db31 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -128,13 +128,13 @@ public class FileTablespace extends Tablespace {
   @Override
   public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
     Path path = new Path(table.getUri());
-    ContentSummary summary;
+    long totalVolume = 0L;
     try {
-      summary = fs.getContentSummary(path);
+      totalVolume = calculateSize(path);
     } catch (IOException e) {
       throw new TajoInternalError(e);
     }
-    return summary.getLength();
+    return totalVolume;
   }
 
   @Override
@@ -246,6 +246,13 @@ public class FileTablespace extends Tablespace {
     return tablets;
   }
 
+  /**
+   * Calculate the total size of all files in the indicated Path
+   *
+   * @param path to use
+   * @return calculated size
+   * @throws IOException
+   */
   public long calculateSize(Path tablePath) throws IOException {
     FileSystem fs = tablePath.getFileSystem(conf);
     long totalSize = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml
index a9a541a..6956411 100644
--- a/tajo-storage/tajo-storage-s3/pom.xml
+++ b/tajo-storage/tajo-storage-s3/pom.xml
@@ -61,13 +61,6 @@
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>src/test/resources/dataset/**</exclude>
-            <exclude>src/test/resources/queries/**</exclude>
-            <exclude>src/test/resources/results/**</exclude>
-          </excludes>
-        </configuration>
         <executions>
           <execution>
             <phase>verify</phase>
@@ -89,13 +82,9 @@
           </execution>
         </executions>
       </plugin>
-
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <skipTests>true</skipTests>
-        </configuration>
+        <artifactId>maven-surefire-report-plugin</artifactId>
       </plugin>
     </plugins>
   </build>
@@ -116,84 +105,41 @@
       <artifactId>tajo-storage-hdfs</artifactId>
       <scope>provided</scope>
     </dependency>
-
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>zookeeper</artifactId>
-          <groupId>org.apache.zookeeper</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>slf4j-api</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jersey-json</artifactId>
-          <groupId>com.sun.jersey</groupId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey.jersey-test-framework</groupId>
-          <artifactId>jersey-test-framework-grizzly2</artifactId>
-        </exclusion>
-        <exclusion>
-          <artifactId>netty-all</artifactId>
-          <groupId>io.netty</groupId>
-        </exclusion>
-      </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>${aws-java-sdk.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>${httpclient.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
   </dependencies>
 
   <profiles>
     <profile>
-      <!-- Run unit tests in tajo-storage-s3, whereas it is disabled as by default. -->
-      <id>test-storage-s3</id>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration combine.self="override">
-              <systemProperties>
-                <tajo.test.enabled>TRUE</tajo.test.enabled>
-              </systemProperties>
-              <argLine>-Xms128m -Xmx1024m -Dfile.encoding=UTF-8</argLine>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>docs</id>
       <activation>
         <activeByDefault>false</activeByDefault>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
new file mode 100644
index 0000000..a884ca8
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider.
+ *
+ */
+public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
+  public AWSCredentials getCredentials() {
+    return new AnonymousAWSCredentials();
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
new file mode 100644
index 0000000..0f4fbde
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider.
+ *
+ */
+public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
+  private final String accessKey;
+  private final String secretKey;
+
+  public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+  }
+
+  public AWSCredentials getCredentials() {
+    if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
+      return new BasicAWSCredentials(accessKey, secretKey);
+    }
+    throw new AmazonClientException(
+      "Access key or secret key is null");
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
index 4bcdb60..ee6af31 100644
--- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
@@ -18,14 +18,225 @@
 
 package org.apache.tajo.storage.s3;
 
+import java.io.IOException;
 import java.net.URI;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.*;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.storage.FileTablespace;
 
 import net.minidev.json.JSONObject;
 
+import static org.apache.tajo.storage.s3.TajoS3Constants.*;
+
 public class S3TableSpace extends FileTablespace {
+  private final static Log LOG = LogFactory.getLog(S3TableSpace.class);
+
+  private AmazonS3 s3;
+  private boolean s3Enabled;
+  private int maxKeys;
+
   public S3TableSpace(String spaceName, URI uri, JSONObject config) {
     super(spaceName, uri, config);
   }
+
+  @Override
+  public void init(TajoConf tajoConf) throws IOException {
+    super.init(tajoConf);
+
+    try {
+      // Try to get our credentials or just connect anonymously
+      String accessKey = conf.get(ACCESS_KEY, null);
+      String secretKey = conf.get(SECRET_KEY, null);
+
+      String userInfo = uri.getUserInfo();
+      if (userInfo != null) {
+        int index = userInfo.indexOf(':');
+        if (index != -1) {
+          accessKey = userInfo.substring(0, index);
+          secretKey = userInfo.substring(index + 1);
+        } else {
+          accessKey = userInfo;
+        }
+      }
+
+      AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
+        new BasicAWSCredentialsProvider(accessKey, secretKey),
+        new InstanceProfileCredentialsProvider(),
+        new AnonymousAWSCredentialsProvider()
+      );
+
+      ClientConfiguration awsConf = new ClientConfiguration();
+      awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+      awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+      awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+      awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+
+      String proxyHost = conf.getTrimmed(PROXY_HOST,"");
+      int proxyPort = conf.getInt(PROXY_PORT, -1);
+      if (!proxyHost.isEmpty()) {
+        awsConf.setProxyHost(proxyHost);
+        if (proxyPort >= 0) {
+          awsConf.setProxyPort(proxyPort);
+        } else {
+          if (secureConnections) {
+            LOG.warn("Proxy host set without port. Using HTTPS default 443");
+            awsConf.setProxyPort(443);
+          } else {
+            LOG.warn("Proxy host set without port. Using HTTP default 80");
+            awsConf.setProxyPort(80);
+          }
+        }
+        String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+        if ((proxyUsername == null) != (proxyPassword == null)) {
+          String msg = "Proxy error: " + PROXY_USERNAME + " or " + PROXY_PASSWORD + " set without the other.";
+          LOG.error(msg);
+        }
+        awsConf.setProxyUsername(proxyUsername);
+        awsConf.setProxyPassword(proxyPassword);
+        awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+        awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Using proxy server %s:%d as user %s with password %s on domain %s as workstation " +
+            "%s", awsConf.getProxyHost(), awsConf.getProxyPort(), awsConf.getProxyUsername(),
+            awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation()));
+        }
+      } else if (proxyPort >= 0) {
+        String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+        LOG.error(msg);
+      }
+
+      s3 = new AmazonS3Client(credentials, awsConf);
+      String endPoint = conf.getTrimmed(ENDPOINT,"");
+      if (!endPoint.isEmpty()) {
+        try {
+          s3.setEndpoint(endPoint);
+        } catch (IllegalArgumentException e) {
+          String msg = "Incorrect endpoint: "  + e.getMessage();
+          LOG.error(msg);
+        }
+      }
+
+      maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
+      s3Enabled = true;
+    } catch (NoClassDefFoundError e) {
+      // If the version of hadoop is less than 2.6.0, hadoop doesn't include aws dependencies because it doesn't provide
+      // S3AFileSystem. In this case, tajo never uses aws s3 api directly.
+      LOG.warn(e);
+      s3Enabled = false;
+    } catch (Exception e) {
+      throw new TajoInternalError(e);
+    }
+  }
+
+  /**
+   * Calculate the total size of all objects in the indicated bucket
+   *
+   * @param path to use
+   * @return calculated size
+   * @throws IOException
+   */
+  @Override
+  public long calculateSize(Path path) throws IOException {
+    long totalBucketSize = 0L;
+
+    if (s3Enabled) {
+      String key = pathToKey(path);
+
+      final FileStatus fileStatus =  fs.getFileStatus(path);
+
+      if (fileStatus.isDirectory()) {
+        if (!key.isEmpty()) {
+          key = key + "/";
+        }
+
+        ListObjectsRequest request = new ListObjectsRequest();
+        request.setBucketName(uri.getHost());
+        request.setPrefix(key);
+        request.setMaxKeys(maxKeys);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("listStatus: doing listObjects for directory " + key);
+        }
+
+        ObjectListing objects = s3.listObjects(request);
+
+        while (true) {
+          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+            Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, fs.getWorkingDirectory());
+
+            // Skip over keys that are ourselves and old S3N _$folder$ files
+            if (keyPath.equals(path) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignoring: " + keyPath);
+              }
+              continue;
+            }
+
+            if (!objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+              totalBucketSize += summary.getSize();
+            }
+          }
+
+          if (objects.isTruncated()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("listStatus: list truncated - getting next batch");
+            }
+            objects = s3.listNextBatchOfObjects(objects);
+          } else {
+            break;
+          }
+        }
+      } else {
+        return fileStatus.getLen();
+      }
+    } else {
+      totalBucketSize = fs.getContentSummary(path).getLength();
+    }
+
+    return totalBucketSize;
+  }
+
+  private boolean objectRepresentsDirectory(final String name, final long size) {
+    return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
+  }
+
+  private Path keyToPath(String key) {
+    return new Path("/" + key);
+  }
+
+  /* Turns a path (relative or otherwise) into an S3 key
+   */
+  private String pathToKey(Path path) {
+    if (!path.isAbsolute()) {
+      path = new Path(fs.getWorkingDirectory(), path);
+    }
+
+    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+      return "";
+    }
+
+    return path.toUri().getPath().substring(1);
+  }
+
+  @VisibleForTesting
+  public void setAmazonS3Client(AmazonS3 s3) {
+    this.s3 = s3;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
new file mode 100644
index 0000000..48f76b8
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.TajoS3Constants.
+ *
+ */
+public class TajoS3Constants {
+  // s3 access key
+  public static final String ACCESS_KEY = "fs.s3a.access.key";
+
+  // s3 secret key
+  public static final String SECRET_KEY = "fs.s3a.secret.key";
+
+  //use a custom endpoint?
+  public static final String ENDPOINT = "fs.s3a.endpoint";
+
+  // number of times we should retry errors
+  public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+  public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
+
+  // connect to s3 over ssl?
+  public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
+  public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
+
+  // seconds until we give up trying to establish a connection to s3
+  public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout";
+  public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
+
+  // seconds until we give up on a connection to s3
+  public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
+  public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
+
+  // number of simultaneous connections to s3
+  public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
+  public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
+
+  // number of records to get while paging through a directory listing
+  public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
+  public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
+
+  public static final String S3N_FOLDER_SUFFIX = "_$folder$";
+
+  //connect to s3 through a proxy server?
+  public static final String PROXY_HOST = "fs.s3a.proxy.host";
+  public static final String PROXY_PORT = "fs.s3a.proxy.port";
+  public static final String PROXY_USERNAME = "fs.s3a.proxy.username";
+  public static final String PROXY_PASSWORD = "fs.s3a.proxy.password";
+  public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
+  public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
new file mode 100644
index 0000000..a0d8c26
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.HttpMethod;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.S3ResponseMetadata;
+import com.amazonaws.services.s3.model.*;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Date;
+import java.util.List;
+
+import static org.apache.http.HttpStatus.SC_OK;
+
+public class MockAmazonS3 implements AmazonS3 {
+  private int getObjectHttpCode = SC_OK;
+  private int getObjectMetadataHttpCode = SC_OK;
+
+  @Override
+  public void setEndpoint(String endpoint) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setRegion(Region region)
+    throws IllegalArgumentException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setS3ClientOptions(S3ClientOptions clientOptions) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectListing listObjects(String bucketName) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectListing listObjects(String bucketName, String prefix)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+    throws AmazonClientException {
+    if (listObjectsRequest.getBucketName().equals("tajo-test") && listObjectsRequest.getPrefix().equals("test/")) {
+      MockObjectListing objectListing = new MockObjectListing();
+      return objectListing;
+    } else {
+      throw new TajoInternalError(new UnsupportedException());
+    }
+  }
+
+  @Override
+  public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public VersionListing listVersions(String bucketName, String prefix)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker,
+    String delimiter, Integer maxResults) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public VersionListing listVersions(ListVersionsRequest listVersionsRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public Owner getS3AccountOwner()
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public boolean doesBucketExist(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public List<Bucket> listBuckets()
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public String getBucketLocation(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public String getBucketLocation(GetBucketLocationRequest getBucketLocationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public Bucket createBucket(CreateBucketRequest createBucketRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public Bucket createBucket(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public Bucket createBucket(String bucketName, com.amazonaws.services.s3.model.Region region)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public Bucket createBucket(String bucketName, String region)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public AccessControlList getObjectAcl(String bucketName, String key)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public AccessControlList getObjectAcl(String bucketName, String key, String versionId)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setObjectAcl(String bucketName, String key, AccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public AccessControlList getBucketAcl(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketAcl(SetBucketAclRequest setBucketAclRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public AccessControlList getBucketAcl(GetBucketAclRequest getBucketAclRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketAcl(String bucketName, AccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketAcl(String bucketName, CannedAccessControlList acl)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public S3Object getObject(String bucketName, String key)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucket(String bucketName) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName,
+    String destinationKey) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public CopyPartResult copyPart(CopyPartRequest copyPartRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteObject(String bucketName, String key) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest
+    setBucketVersioningConfigurationRequest) throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketLifecycleConfiguration getBucketLifecycleConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketLifecycleConfiguration(String bucketName,
+                                              BucketLifecycleConfiguration bucketLifecycleConfiguration) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketLifecycleConfiguration(SetBucketLifecycleConfigurationRequest
+                                                  setBucketLifecycleConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketLifecycleConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketLifecycleConfiguration(DeleteBucketLifecycleConfigurationRequest
+                                                     deleteBucketLifecycleConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketCrossOriginConfiguration getBucketCrossOriginConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketCrossOriginConfiguration(String bucketName, BucketCrossOriginConfiguration
+    bucketCrossOriginConfiguration) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketCrossOriginConfiguration(SetBucketCrossOriginConfigurationRequest
+                                                    setBucketCrossOriginConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketCrossOriginConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketCrossOriginConfiguration(DeleteBucketCrossOriginConfigurationRequest
+                                                       deleteBucketCrossOriginConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketTaggingConfiguration getBucketTaggingConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketTaggingConfiguration(String bucketName, BucketTaggingConfiguration bucketTaggingConfiguration) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketTaggingConfiguration(SetBucketTaggingConfigurationRequest setBucketTaggingConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketTaggingConfiguration(String bucketName) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketTaggingConfiguration(DeleteBucketTaggingConfigurationRequest
+                                                   deleteBucketTaggingConfigurationRequest) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketNotificationConfiguration(SetBucketNotificationConfigurationRequest
+                                                     setBucketNotificationConfigurationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration
+    bucketNotificationConfiguration)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketWebsiteConfiguration getBucketWebsiteConfiguration(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketWebsiteConfiguration getBucketWebsiteConfiguration(GetBucketWebsiteConfigurationRequest
+                                                                      getBucketWebsiteConfigurationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketWebsiteConfiguration(String bucketName, BucketWebsiteConfiguration configuration)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketWebsiteConfiguration(SetBucketWebsiteConfigurationRequest setBucketWebsiteConfigurationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketWebsiteConfiguration(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketWebsiteConfiguration(DeleteBucketWebsiteConfigurationRequest
+                                                   deleteBucketWebsiteConfigurationRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketPolicy getBucketPolicy(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public BucketPolicy getBucketPolicy(GetBucketPolicyRequest getBucketPolicyRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketPolicy(String bucketName, String policyText)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void setBucketPolicy(SetBucketPolicyRequest setBucketPolicyRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketPolicy(String bucketName)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void deleteBucketPolicy(DeleteBucketPolicyRequest deleteBucketPolicyRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public URL generatePresignedUrl(String bucketName, String key, Date expiration)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public UploadPartResult uploadPart(UploadPartRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public PartListing listParts(ListPartsRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void abortMultipartUpload(AbortMultipartUploadRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request)
+    throws AmazonClientException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void restoreObject(RestoreObjectRequest request)
+    throws AmazonServiceException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+
+  @Override
+  public void restoreObject(String bucketName, String key, int expirationInDays)
+    throws AmazonServiceException {
+    throw new TajoInternalError(new UnsupportedException());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
new file mode 100644
index 0000000..96a7f80
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class MockObjectListing extends ObjectListing {
+
+  @Override
+  public List<S3ObjectSummary> getObjectSummaries() {
+    final String bucketName = "tajo-test";
+
+    List<S3ObjectSummary> objectSummaries = Lists.newArrayList();
+    objectSummaries.add(getS3ObjectSummary(bucketName, "test/data01", 10L));
+    objectSummaries.add(getS3ObjectSummary(bucketName, "test/data02", 10L));
+    objectSummaries.add(getS3ObjectSummary(bucketName, "test/data03", 10L));
+
+    return objectSummaries;
+  }
+
+  private S3ObjectSummary getS3ObjectSummary(String bucketName, String key, long size) {
+    S3ObjectSummary objectSummary = new S3ObjectSummary();
+    objectSummary.setBucketName(bucketName);
+    objectSummary.setKey(key);
+    objectSummary.setSize(size);
+    return objectSummary;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
index 15c6a33..6526269 100644
--- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
 
 import java.io.IOException;
 import java.net.URI;
@@ -36,7 +38,6 @@ public class MockS3FileSystem extends FileSystem {
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
   }
 
-
   /**
    * Return the protocol scheme for the FileSystem.
    * <p/>
@@ -60,18 +61,18 @@ public class MockS3FileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return null;
+    throw new TajoInternalError(new UnsupportedException());
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
     short replication, long blockSize, Progressable progress) throws IOException {
-    return null;
+    throw new TajoInternalError(new UnsupportedException());
   }
 
   @Override
   public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
-    return null;
+    throw new TajoInternalError(new UnsupportedException());
   }
 
   @Override
@@ -95,7 +96,7 @@ public class MockS3FileSystem extends FileSystem {
 
   @Override
   public Path getWorkingDirectory() {
-    return null;
+    return new Path(uri);
   }
 
   @Override
@@ -105,6 +106,12 @@ public class MockS3FileSystem extends FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    return null;
+    if (f.equals(new Path(TestS3TableSpace.S3_URI, "test"))
+      || f.equals(new Path(TestS3TableSpace.S3N_URI, "test"))
+      || f.equals(new Path(TestS3TableSpace.S3A_URI, "test"))) {
+      return new FileStatus(0, true, 1, 0, 0, f);
+    } else {
+      throw new TajoInternalError(new UnsupportedException());
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
index 2d06778..2b630c0 100644
--- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage.s3;
 
 import net.minidev.json.JSONObject;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.TablespaceManager;
 import org.junit.AfterClass;
@@ -28,35 +29,103 @@ import org.junit.Test;
 import java.io.IOException;
 import java.net.URI;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestS3TableSpace {
-  public static final String SPACENAME = "s3_cluster";
+  public static final String S3_SPACENAME = "s3_cluster";
+  public static final String S3N_SPACENAME = "s3N_cluster";
+  public static final String S3A_SPACENAME = "s3A_cluster";
+
   public static final String S3_URI = "s3://tajo-test/";
+  public static final String S3N_URI = "s3n://tajo-test/";
+  public static final String S3A_URI = "s3a://tajo-test/";
 
   @BeforeClass
   public static void setUp() throws Exception {
-    S3TableSpace tablespace = new S3TableSpace(SPACENAME, URI.create(S3_URI), new JSONObject());
-
+    // Add tablespace for s3 prefix
+    S3TableSpace s3TableSpace = new S3TableSpace(S3_SPACENAME, URI.create(S3_URI), new JSONObject());
     TajoConf tajoConf = new TajoConf();
     tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName());
-    tablespace.init(tajoConf);
+    tajoConf.set("fs.s3.awsAccessKeyId", "test_access_key_id");
+    tajoConf.set("fs.s3.awsSecretAccessKey", "test_secret_access_key");
+    s3TableSpace.init(tajoConf);
+    TablespaceManager.addTableSpaceForTest(s3TableSpace);
 
-    TablespaceManager.addTableSpaceForTest(tablespace);
+    // Add tablespace for s3n prefix
+    S3TableSpace s3nTableSpace = new S3TableSpace(S3N_SPACENAME, URI.create(S3N_URI), new JSONObject());
+    tajoConf = new TajoConf();
+    tajoConf.set("fs.s3n.impl", MockS3FileSystem.class.getName());
+    tajoConf.set("fs.s3n.awsAccessKeyId", "test_access_key_id");
+    tajoConf.set("fs.s3n.awsSecretAccessKey", "test_secret_access_key");
+    s3nTableSpace.init(tajoConf);
+    TablespaceManager.addTableSpaceForTest(s3nTableSpace);
+
+    // Add tablespace for s3a prefix
+    S3TableSpace s3aTableSpace = new S3TableSpace(S3A_SPACENAME, URI.create(S3A_URI), new JSONObject());
+    tajoConf = new TajoConf();
+    tajoConf.set("fs.s3a.impl", MockS3FileSystem.class.getName());
+    tajoConf.set("fs.s3a.access.key", "test_access_key_id");
+    tajoConf.set("fs.s3a.secret.key", "test_secret_access_key");
+    s3aTableSpace.init(tajoConf);
+    TablespaceManager.addTableSpaceForTest(s3aTableSpace);
   }
 
   @AfterClass
   public static void tearDown() throws IOException {
-    TablespaceManager.removeTablespaceForTest(SPACENAME);
+    TablespaceManager.removeTablespaceForTest(S3_SPACENAME);
+    TablespaceManager.removeTablespaceForTest(S3N_SPACENAME);
+    TablespaceManager.removeTablespaceForTest(S3A_SPACENAME);
   }
 
   @Test
   public void testTablespaceHandler() throws Exception {
-    assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace);
-    assertEquals(SPACENAME, (TablespaceManager.getByName(SPACENAME).getName()));
-
+    // Verify the tablespace for s3 prefix
+    assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace);
+    assertEquals(S3_SPACENAME, (TablespaceManager.getByName(S3_SPACENAME).getName()));
     assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace);
     assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString());
+
+    // Verify the tablespace for s3n prefix
+    assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace);
+    assertEquals(S3N_SPACENAME, (TablespaceManager.getByName(S3N_SPACENAME).getName()));
+    assertTrue((TablespaceManager.get(S3N_URI)) instanceof S3TableSpace);
+    assertEquals(S3N_URI, TablespaceManager.get(S3N_URI).getUri().toASCIIString());
+
+    // Verify the tablespace for s3a prefix
+    assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace);
+    assertEquals(S3A_SPACENAME, (TablespaceManager.getByName(S3A_SPACENAME).getName()));
+    assertTrue((TablespaceManager.get(S3A_URI)) instanceof S3TableSpace);
+    assertEquals(S3A_URI, TablespaceManager.get(S3A_URI).getUri().toASCIIString());
+  }
+
+  @Test
+  public void testCalculateSizeWithS3Prefix() throws Exception {
+    Path path = new Path(S3_URI, "test");
+    assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace);
+    S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+    tableSpace.setAmazonS3Client(new MockAmazonS3());
+    long size = tableSpace.calculateSize(path);
+    assertEquals(30L, size);
+  }
+
+  @Test
+  public void testCalculateSizeWithS3NPrefix() throws Exception {
+    Path path = new Path(S3N_URI, "test");
+    assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace);
+    S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+    tableSpace.setAmazonS3Client(new MockAmazonS3());
+    long size = tableSpace.calculateSize(path);
+    assertEquals(30L, size);
   }
+
+  @Test
+  public void testCalculateSizeWithS3APrefix() throws Exception {
+    Path path = new Path(S3A_URI, "test");
+    assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace);
+    S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+    tableSpace.setAmazonS3Client(new MockAmazonS3());
+    long size = tableSpace.calculateSize(path);
+    assertEquals(30L, size);
+  }
+
 }