You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2016/08/29 14:12:30 UTC
tajo git commit: TAJO-2069: Implement finding the total size of all
objects in a bucket with AWS SDK.
Repository: tajo
Updated Branches:
refs/heads/master 0ef485ba7 -> 4f35c28e8
TAJO-2069: Implement finding the total size of all objects in a bucket with AWS SDK.
Closes #1024
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4f35c28e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4f35c28e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4f35c28e
Branch: refs/heads/master
Commit: 4f35c28e8e0281beed0a457244fbb9144e791fee
Parents: 0ef485b
Author: JaeHwa Jung <bl...@apache.org>
Authored: Mon Aug 29 23:07:31 2016 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Mon Aug 29 23:07:31 2016 +0900
----------------------------------------------------------------------
CHANGES | 3 +
tajo-project/pom.xml | 2 +
.../org/apache/tajo/storage/FileTablespace.java | 13 +-
tajo-storage/tajo-storage-s3/pom.xml | 88 +--
.../s3/AnonymousAWSCredentialsProvider.java | 41 ++
.../storage/s3/BasicAWSCredentialsProvider.java | 55 ++
.../apache/tajo/storage/s3/S3TableSpace.java | 211 +++++++
.../apache/tajo/storage/s3/TajoS3Constants.java | 69 +++
.../apache/tajo/storage/s3/MockAmazonS3.java | 616 +++++++++++++++++++
.../tajo/storage/s3/MockObjectListing.java | 48 ++
.../tajo/storage/s3/MockS3FileSystem.java | 21 +-
.../tajo/storage/s3/TestS3TableSpace.java | 91 ++-
12 files changed, 1166 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 536c728..f6c5458 100644
--- a/CHANGES
+++ b/CHANGES
@@ -413,6 +413,9 @@ Release 0.12.0 - unreleased
SUB TASKS
+ TAJO-2069: Implement finding the total size of all objects in a bucket
+ with AWS SDK. (jaehwa)
+
TAJO-1488: Implement KafkaTablespace. (Byunghwa Yun via jinho)
TAJO-1487: Kafka Scanner for kafka strage. (Byunghwa Yun via jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index e6a018c..c259c7a 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -41,6 +41,8 @@
<jetty.version>6.1.26</jetty.version>
<parquet.version>1.8.1</parquet.version>
<kafka.version>0.10.0.1</kafka.version>
+ <aws-java-sdk.version>1.7.4</aws-java-sdk.version>
+ <httpclient.version>4.5</httpclient.version>
<tajo.root>${project.parent.relativePath}/..</tajo.root>
<extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
</properties>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2785de4..dd7db31 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -128,13 +128,13 @@ public class FileTablespace extends Tablespace {
@Override
public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
Path path = new Path(table.getUri());
- ContentSummary summary;
+ long totalVolume = 0L;
try {
- summary = fs.getContentSummary(path);
+ totalVolume = calculateSize(path);
} catch (IOException e) {
throw new TajoInternalError(e);
}
- return summary.getLength();
+ return totalVolume;
}
@Override
@@ -246,6 +246,13 @@ public class FileTablespace extends Tablespace {
return tablets;
}
+ /**
+ * Calculate the total size of all files in the indicated Path
+ *
+ * @param path to use
+ * @return calculated size
+ * @throws IOException
+ */
public long calculateSize(Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(conf);
long totalSize = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml
index a9a541a..6956411 100644
--- a/tajo-storage/tajo-storage-s3/pom.xml
+++ b/tajo-storage/tajo-storage-s3/pom.xml
@@ -61,13 +61,6 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>src/test/resources/dataset/**</exclude>
- <exclude>src/test/resources/queries/**</exclude>
- <exclude>src/test/resources/results/**</exclude>
- </excludes>
- </configuration>
<executions>
<execution>
<phase>verify</phase>
@@ -89,13 +82,9 @@
</execution>
</executions>
</plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>true</skipTests>
- </configuration>
+ <artifactId>maven-surefire-report-plugin</artifactId>
</plugin>
</plugins>
</build>
@@ -116,84 +105,41 @@
<artifactId>tajo-storage-hdfs</artifactId>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>zookeeper</artifactId>
- <groupId>org.apache.zookeeper</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-json</artifactId>
- <groupId>com.sun.jersey</groupId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey.jersey-test-framework</groupId>
- <artifactId>jersey-test-framework-grizzly2</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>netty-all</artifactId>
- <groupId>io.netty</groupId>
- </exclusion>
- </exclusions>
</dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>${aws-java-sdk.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
<profiles>
<profile>
- <!-- Run unit tests in tajo-storage-s3, whereas it is disabled as by default. -->
- <id>test-storage-s3</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration combine.self="override">
- <systemProperties>
- <tajo.test.enabled>TRUE</tajo.test.enabled>
- </systemProperties>
- <argLine>-Xms128m -Xmx1024m -Dfile.encoding=UTF-8</argLine>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
<id>docs</id>
<activation>
<activeByDefault>false</activeByDefault>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
new file mode 100644
index 0000000..a884ca8
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider.
+ *
+ */
+public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
+ public AWSCredentials getCredentials() {
+ return new AnonymousAWSCredentials();
+ }
+
+ public void refresh() {}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
new file mode 100644
index 0000000..0f4fbde
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider.
+ *
+ */
+public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
+ private final String accessKey;
+ private final String secretKey;
+
+ public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ public AWSCredentials getCredentials() {
+ if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
+ return new BasicAWSCredentials(accessKey, secretKey);
+ }
+ throw new AmazonClientException(
+ "Access key or secret key is null");
+ }
+
+ public void refresh() {}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
index 4bcdb60..ee6af31 100644
--- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java
@@ -18,14 +18,225 @@
package org.apache.tajo.storage.s3;
+import java.io.IOException;
import java.net.URI;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.*;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.storage.FileTablespace;
import net.minidev.json.JSONObject;
+import static org.apache.tajo.storage.s3.TajoS3Constants.*;
+
public class S3TableSpace extends FileTablespace {
+ private final static Log LOG = LogFactory.getLog(S3TableSpace.class);
+
+ private AmazonS3 s3;
+ private boolean s3Enabled;
+ private int maxKeys;
+
public S3TableSpace(String spaceName, URI uri, JSONObject config) {
super(spaceName, uri, config);
}
+
+ @Override
+ public void init(TajoConf tajoConf) throws IOException {
+ super.init(tajoConf);
+
+ try {
+ // Try to get our credentials or just connect anonymously
+ String accessKey = conf.get(ACCESS_KEY, null);
+ String secretKey = conf.get(SECRET_KEY, null);
+
+ String userInfo = uri.getUserInfo();
+ if (userInfo != null) {
+ int index = userInfo.indexOf(':');
+ if (index != -1) {
+ accessKey = userInfo.substring(0, index);
+ secretKey = userInfo.substring(index + 1);
+ } else {
+ accessKey = userInfo;
+ }
+ }
+
+ AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
+ new BasicAWSCredentialsProvider(accessKey, secretKey),
+ new InstanceProfileCredentialsProvider(),
+ new AnonymousAWSCredentialsProvider()
+ );
+
+ ClientConfiguration awsConf = new ClientConfiguration();
+ awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+ awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+ awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+ awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+
+ String proxyHost = conf.getTrimmed(PROXY_HOST,"");
+ int proxyPort = conf.getInt(PROXY_PORT, -1);
+ if (!proxyHost.isEmpty()) {
+ awsConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ awsConf.setProxyPort(proxyPort);
+ } else {
+ if (secureConnections) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ awsConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ awsConf.setProxyPort(80);
+ }
+ }
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME + " or " + PROXY_PASSWORD + " set without the other.";
+ LOG.error(msg);
+ }
+ awsConf.setProxyUsername(proxyUsername);
+ awsConf.setProxyPassword(proxyPassword);
+ awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+ awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Using proxy server %s:%d as user %s with password %s on domain %s as workstation " +
+ "%s", awsConf.getProxyHost(), awsConf.getProxyPort(), awsConf.getProxyUsername(),
+ awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation()));
+ }
+ } else if (proxyPort >= 0) {
+ String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+ LOG.error(msg);
+ }
+
+ s3 = new AmazonS3Client(credentials, awsConf);
+ String endPoint = conf.getTrimmed(ENDPOINT,"");
+ if (!endPoint.isEmpty()) {
+ try {
+ s3.setEndpoint(endPoint);
+ } catch (IllegalArgumentException e) {
+ String msg = "Incorrect endpoint: " + e.getMessage();
+ LOG.error(msg);
+ }
+ }
+
+ maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
+ s3Enabled = true;
+ } catch (NoClassDefFoundError e) {
+ // If the version of hadoop is less than 2.6.0, hadoop doesn't include aws dependencies because it doesn't provide
+ // S3AFileSystem. In this case, tajo never uses aws s3 api directly.
+ LOG.warn(e);
+ s3Enabled = false;
+ } catch (Exception e) {
+ throw new TajoInternalError(e);
+ }
+ }
+
+ /**
+ * Calculate the total size of all objects in the indicated bucket
+ *
+ * @param path to use
+ * @return calculated size
+ * @throws IOException
+ */
+ @Override
+ public long calculateSize(Path path) throws IOException {
+ long totalBucketSize = 0L;
+
+ if (s3Enabled) {
+ String key = pathToKey(path);
+
+ final FileStatus fileStatus = fs.getFileStatus(path);
+
+ if (fileStatus.isDirectory()) {
+ if (!key.isEmpty()) {
+ key = key + "/";
+ }
+
+ ListObjectsRequest request = new ListObjectsRequest();
+ request.setBucketName(uri.getHost());
+ request.setPrefix(key);
+ request.setMaxKeys(maxKeys);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("listStatus: doing listObjects for directory " + key);
+ }
+
+ ObjectListing objects = s3.listObjects(request);
+
+ while (true) {
+ for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+ Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, fs.getWorkingDirectory());
+
+ // Skip over keys that are ourselves and old S3N _$folder$ files
+ if (keyPath.equals(path) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring: " + keyPath);
+ }
+ continue;
+ }
+
+ if (!objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+ totalBucketSize += summary.getSize();
+ }
+ }
+
+ if (objects.isTruncated()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("listStatus: list truncated - getting next batch");
+ }
+ objects = s3.listNextBatchOfObjects(objects);
+ } else {
+ break;
+ }
+ }
+ } else {
+ return fileStatus.getLen();
+ }
+ } else {
+ totalBucketSize = fs.getContentSummary(path).getLength();
+ }
+
+ return totalBucketSize;
+ }
+
+ private boolean objectRepresentsDirectory(final String name, final long size) {
+ return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
+ }
+
+ private Path keyToPath(String key) {
+ return new Path("/" + key);
+ }
+
+ /* Turns a path (relative or otherwise) into an S3 key
+ */
+ private String pathToKey(Path path) {
+ if (!path.isAbsolute()) {
+ path = new Path(fs.getWorkingDirectory(), path);
+ }
+
+ if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+ return "";
+ }
+
+ return path.toUri().getPath().substring(1);
+ }
+
+ @VisibleForTesting
+ public void setAmazonS3Client(AmazonS3 s3) {
+ this.s3 = s3;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
new file mode 100644
index 0000000..48f76b8
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+/**
+ * Borrow from org.apache.hadoop.fs.s3a.TajoS3Constants.
+ *
+ */
+public class TajoS3Constants {
+ // s3 access key
+ public static final String ACCESS_KEY = "fs.s3a.access.key";
+
+ // s3 secret key
+ public static final String SECRET_KEY = "fs.s3a.secret.key";
+
+ //use a custom endpoint?
+ public static final String ENDPOINT = "fs.s3a.endpoint";
+
+ // number of times we should retry errors
+ public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+ public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
+
+ // connect to s3 over ssl?
+ public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
+ public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
+
+ // seconds until we give up trying to establish a connection to s3
+ public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout";
+ public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
+
+ // seconds until we give up on a connection to s3
+ public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
+ public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
+
+ // number of simultaneous connections to s3
+ public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
+ public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
+
+ // number of records to get while paging through a directory listing
+ public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
+ public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
+
+ public static final String S3N_FOLDER_SUFFIX = "_$folder$";
+
+ //connect to s3 through a proxy server?
+ public static final String PROXY_HOST = "fs.s3a.proxy.host";
+ public static final String PROXY_PORT = "fs.s3a.proxy.port";
+ public static final String PROXY_USERNAME = "fs.s3a.proxy.username";
+ public static final String PROXY_PASSWORD = "fs.s3a.proxy.password";
+ public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
+ public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
new file mode 100644
index 0000000..a0d8c26
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.HttpMethod;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.S3ResponseMetadata;
+import com.amazonaws.services.s3.model.*;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Date;
+import java.util.List;
+
+import static org.apache.http.HttpStatus.SC_OK;
+
+public class MockAmazonS3 implements AmazonS3 {
+ private int getObjectHttpCode = SC_OK;
+ private int getObjectMetadataHttpCode = SC_OK;
+
+ @Override
+ public void setEndpoint(String endpoint) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setRegion(Region region)
+ throws IllegalArgumentException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setS3ClientOptions(S3ClientOptions clientOptions) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectListing listObjects(String bucketName) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectListing listObjects(String bucketName, String prefix)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+ throws AmazonClientException {
+ if (listObjectsRequest.getBucketName().equals("tajo-test") && listObjectsRequest.getPrefix().equals("test/")) {
+ MockObjectListing objectListing = new MockObjectListing();
+ return objectListing;
+ } else {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+ }
+
+ @Override
+ public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public VersionListing listVersions(String bucketName, String prefix)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker,
+ String delimiter, Integer maxResults) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public VersionListing listVersions(ListVersionsRequest listVersionsRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public Owner getS3AccountOwner()
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public boolean doesBucketExist(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public List<Bucket> listBuckets()
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public String getBucketLocation(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public String getBucketLocation(GetBucketLocationRequest getBucketLocationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public Bucket createBucket(CreateBucketRequest createBucketRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName, com.amazonaws.services.s3.model.Region region)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName, String region)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public AccessControlList getObjectAcl(String bucketName, String key)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public AccessControlList getObjectAcl(String bucketName, String key, String versionId)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, AccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public AccessControlList getBucketAcl(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketAcl(SetBucketAclRequest setBucketAclRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public AccessControlList getBucketAcl(GetBucketAclRequest getBucketAclRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketAcl(String bucketName, AccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketAcl(String bucketName, CannedAccessControlList acl)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public S3Object getObject(String bucketName, String key)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucket(String bucketName) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName,
+ String destinationKey) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public CopyPartResult copyPart(CopyPartRequest copyPartRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteObject(String bucketName, String key) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest
+ setBucketVersioningConfigurationRequest) throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketLifecycleConfiguration getBucketLifecycleConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketLifecycleConfiguration(String bucketName,
+ BucketLifecycleConfiguration bucketLifecycleConfiguration) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketLifecycleConfiguration(SetBucketLifecycleConfigurationRequest
+ setBucketLifecycleConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketLifecycleConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketLifecycleConfiguration(DeleteBucketLifecycleConfigurationRequest
+ deleteBucketLifecycleConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketCrossOriginConfiguration getBucketCrossOriginConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketCrossOriginConfiguration(String bucketName, BucketCrossOriginConfiguration
+ bucketCrossOriginConfiguration) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketCrossOriginConfiguration(SetBucketCrossOriginConfigurationRequest
+ setBucketCrossOriginConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketCrossOriginConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketCrossOriginConfiguration(DeleteBucketCrossOriginConfigurationRequest
+ deleteBucketCrossOriginConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketTaggingConfiguration getBucketTaggingConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketTaggingConfiguration(String bucketName, BucketTaggingConfiguration bucketTaggingConfiguration) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketTaggingConfiguration(SetBucketTaggingConfigurationRequest setBucketTaggingConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketTaggingConfiguration(String bucketName) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketTaggingConfiguration(DeleteBucketTaggingConfigurationRequest
+ deleteBucketTaggingConfigurationRequest) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketNotificationConfiguration(SetBucketNotificationConfigurationRequest
+ setBucketNotificationConfigurationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration
+ bucketNotificationConfiguration)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketWebsiteConfiguration getBucketWebsiteConfiguration(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketWebsiteConfiguration getBucketWebsiteConfiguration(GetBucketWebsiteConfigurationRequest
+ getBucketWebsiteConfigurationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketWebsiteConfiguration(String bucketName, BucketWebsiteConfiguration configuration)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketWebsiteConfiguration(SetBucketWebsiteConfigurationRequest setBucketWebsiteConfigurationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketWebsiteConfiguration(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketWebsiteConfiguration(DeleteBucketWebsiteConfigurationRequest
+ deleteBucketWebsiteConfigurationRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketPolicy getBucketPolicy(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public BucketPolicy getBucketPolicy(GetBucketPolicyRequest getBucketPolicyRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketPolicy(String bucketName, String policyText)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void setBucketPolicy(SetBucketPolicyRequest setBucketPolicyRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketPolicy(String bucketName)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void deleteBucketPolicy(DeleteBucketPolicyRequest deleteBucketPolicyRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public URL generatePresignedUrl(String bucketName, String key, Date expiration)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public UploadPartResult uploadPart(UploadPartRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public PartListing listParts(ListPartsRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void abortMultipartUpload(AbortMultipartUploadRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request)
+ throws AmazonClientException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void restoreObject(RestoreObjectRequest request)
+ throws AmazonServiceException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+
+ @Override
+ public void restoreObject(String bucketName, String key, int expirationInDays)
+ throws AmazonServiceException {
+ throw new TajoInternalError(new UnsupportedException());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
new file mode 100644
index 0000000..96a7f80
--- /dev/null
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.s3;
+
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class MockObjectListing extends ObjectListing {
+
+ @Override
+ public List<S3ObjectSummary> getObjectSummaries() {
+ final String bucketName = "tajo-test";
+
+ List<S3ObjectSummary> objectSummaries = Lists.newArrayList();
+ objectSummaries.add(getS3ObjectSummary(bucketName, "test/data01", 10L));
+ objectSummaries.add(getS3ObjectSummary(bucketName, "test/data02", 10L));
+ objectSummaries.add(getS3ObjectSummary(bucketName, "test/data03", 10L));
+
+ return objectSummaries;
+ }
+
+ private S3ObjectSummary getS3ObjectSummary(String bucketName, String key, long size) {
+ S3ObjectSummary objectSummary = new S3ObjectSummary();
+ objectSummary.setBucketName(bucketName);
+ objectSummary.setKey(key);
+ objectSummary.setSize(size);
+ return objectSummary;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
index 15c6a33..6526269 100644
--- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
import java.io.IOException;
import java.net.URI;
@@ -36,7 +38,6 @@ public class MockS3FileSystem extends FileSystem {
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
}
-
/**
* Return the protocol scheme for the FileSystem.
* <p/>
@@ -60,18 +61,18 @@ public class MockS3FileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- return null;
+ throw new TajoInternalError(new UnsupportedException());
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
- return null;
+ throw new TajoInternalError(new UnsupportedException());
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
- return null;
+ throw new TajoInternalError(new UnsupportedException());
}
@Override
@@ -95,7 +96,7 @@ public class MockS3FileSystem extends FileSystem {
@Override
public Path getWorkingDirectory() {
- return null;
+ return new Path(uri);
}
@Override
@@ -105,6 +106,12 @@ public class MockS3FileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
- return null;
+ if (f.equals(new Path(TestS3TableSpace.S3_URI, "test"))
+ || f.equals(new Path(TestS3TableSpace.S3N_URI, "test"))
+ || f.equals(new Path(TestS3TableSpace.S3A_URI, "test"))) {
+ return new FileStatus(0, true, 1, 0, 0, f);
+ } else {
+ throw new TajoInternalError(new UnsupportedException());
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4f35c28e/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
index 2d06778..2b630c0 100644
--- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
+++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage.s3;
import net.minidev.json.JSONObject;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.TablespaceManager;
import org.junit.AfterClass;
@@ -28,35 +29,103 @@ import org.junit.Test;
import java.io.IOException;
import java.net.URI;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestS3TableSpace {
- public static final String SPACENAME = "s3_cluster";
+ public static final String S3_SPACENAME = "s3_cluster";
+ public static final String S3N_SPACENAME = "s3N_cluster";
+ public static final String S3A_SPACENAME = "s3A_cluster";
+
public static final String S3_URI = "s3://tajo-test/";
+ public static final String S3N_URI = "s3n://tajo-test/";
+ public static final String S3A_URI = "s3a://tajo-test/";
@BeforeClass
public static void setUp() throws Exception {
- S3TableSpace tablespace = new S3TableSpace(SPACENAME, URI.create(S3_URI), new JSONObject());
-
+ // Add tablespace for s3 prefix
+ S3TableSpace s3TableSpace = new S3TableSpace(S3_SPACENAME, URI.create(S3_URI), new JSONObject());
TajoConf tajoConf = new TajoConf();
tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName());
- tablespace.init(tajoConf);
+ tajoConf.set("fs.s3.awsAccessKeyId", "test_access_key_id");
+ tajoConf.set("fs.s3.awsSecretAccessKey", "test_secret_access_key");
+ s3TableSpace.init(tajoConf);
+ TablespaceManager.addTableSpaceForTest(s3TableSpace);
- TablespaceManager.addTableSpaceForTest(tablespace);
+ // Add tablespace for s3n prefix
+ S3TableSpace s3nTableSpace = new S3TableSpace(S3N_SPACENAME, URI.create(S3N_URI), new JSONObject());
+ tajoConf = new TajoConf();
+ tajoConf.set("fs.s3n.impl", MockS3FileSystem.class.getName());
+ tajoConf.set("fs.s3n.awsAccessKeyId", "test_access_key_id");
+ tajoConf.set("fs.s3n.awsSecretAccessKey", "test_secret_access_key");
+ s3nTableSpace.init(tajoConf);
+ TablespaceManager.addTableSpaceForTest(s3nTableSpace);
+
+ // Add tablespace for s3a prefix
+ S3TableSpace s3aTableSpace = new S3TableSpace(S3A_SPACENAME, URI.create(S3A_URI), new JSONObject());
+ tajoConf = new TajoConf();
+ tajoConf.set("fs.s3a.impl", MockS3FileSystem.class.getName());
+ tajoConf.set("fs.s3a.access.key", "test_access_key_id");
+ tajoConf.set("fs.s3a.secret.key", "test_secret_access_key");
+ s3aTableSpace.init(tajoConf);
+ TablespaceManager.addTableSpaceForTest(s3aTableSpace);
}
@AfterClass
public static void tearDown() throws IOException {
- TablespaceManager.removeTablespaceForTest(SPACENAME);
+ TablespaceManager.removeTablespaceForTest(S3_SPACENAME);
+ TablespaceManager.removeTablespaceForTest(S3N_SPACENAME);
+ TablespaceManager.removeTablespaceForTest(S3A_SPACENAME);
}
@Test
public void testTablespaceHandler() throws Exception {
- assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace);
- assertEquals(SPACENAME, (TablespaceManager.getByName(SPACENAME).getName()));
-
+ // Verify the tablespace for s3 prefix
+ assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace);
+ assertEquals(S3_SPACENAME, (TablespaceManager.getByName(S3_SPACENAME).getName()));
assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace);
assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString());
+
+ // Verify the tablespace for s3n prefix
+ assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace);
+ assertEquals(S3N_SPACENAME, (TablespaceManager.getByName(S3N_SPACENAME).getName()));
+ assertTrue((TablespaceManager.get(S3N_URI)) instanceof S3TableSpace);
+ assertEquals(S3N_URI, TablespaceManager.get(S3N_URI).getUri().toASCIIString());
+
+ // Verify the tablespace for s3a prefix
+ assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace);
+ assertEquals(S3A_SPACENAME, (TablespaceManager.getByName(S3A_SPACENAME).getName()));
+ assertTrue((TablespaceManager.get(S3A_URI)) instanceof S3TableSpace);
+ assertEquals(S3A_URI, TablespaceManager.get(S3A_URI).getUri().toASCIIString());
+ }
+
+ @Test
+ public void testCalculateSizeWithS3Prefix() throws Exception {
+ Path path = new Path(S3_URI, "test");
+ assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace);
+ S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+ tableSpace.setAmazonS3Client(new MockAmazonS3());
+ long size = tableSpace.calculateSize(path);
+ assertEquals(30L, size);
+ }
+
+ @Test
+ public void testCalculateSizeWithS3NPrefix() throws Exception {
+ Path path = new Path(S3N_URI, "test");
+ assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace);
+ S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+ tableSpace.setAmazonS3Client(new MockAmazonS3());
+ long size = tableSpace.calculateSize(path);
+ assertEquals(30L, size);
}
+
+ @Test
+ public void testCalculateSizeWithS3APrefix() throws Exception {
+ Path path = new Path(S3A_URI, "test");
+ assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace);
+ S3TableSpace tableSpace = TablespaceManager.get(path.toUri());
+ tableSpace.setAmazonS3Client(new MockAmazonS3());
+ long size = tableSpace.calculateSize(path);
+ assertEquals(30L, size);
+ }
+
}