You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2008/06/06 23:06:31 UTC
svn commit: r664126 [1/2] - in /hadoop/core/trunk: ./ conf/
src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/fs/s3native/
src/test/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/fs/
src/test/org/apache/hadoop/fs/s3/ src/test/or...
Author: cutting
Date: Fri Jun 6 14:06:30 2008
New Revision: 664126
URL: http://svn.apache.org/viewvc?rev=664126&view=rev
Log:
HADOOP-930. Add support for native S3 files. Contributed by Tom White.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3Credentials.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/FileMetadata.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/PartialListing.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/package.html
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSFileSystemContract.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/FileSystemContractBaseTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Credentials.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3FileSystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/TestInMemoryNativeS3FileSystemContract.java
Removed:
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Uri.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/conf/log4j.properties
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/package.html
hadoop/core/trunk/src/test/hadoop-site.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 14:06:30 2008
@@ -149,6 +149,8 @@
HADOOP-3230. Add ability to get counter values from command
line. (tomwhite via omalley)
+ HADOOP-930. Add support for native S3 files. (tomwhite via cutting)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Jun 6 14:06:30 2008
@@ -182,6 +182,12 @@
</property>
<property>
+ <name>fs.s3n.impl</name>
+ <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
+ <description>The FileSystem for s3n: (Native S3) uris.</description>
+</property>
+
+<property>
<name>fs.kfs.impl</name>
<value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
<description>The FileSystem for kfs: uris.</description>
@@ -558,15 +564,15 @@
<name>fs.s3.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3</value>
<description>Determines where on the local filesystem the S3 filesystem
- should store its blocks before it sends them to S3
- or after it retrieves them from S3.
+ should store files before sending them to S3
+ (or after retrieving them from S3).
</description>
</property>
<property>
<name>fs.s3.maxRetries</name>
<value>4</value>
- <description>The maximum number of retries for reading or writing blocks to S3,
+ <description>The maximum number of retries for reading or writing files to S3,
before we signal failure to the application.
</description>
</property>
Modified: hadoop/core/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/log4j.properties?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/conf/log4j.properties (original)
+++ hadoop/core/trunk/conf/log4j.properties Fri Jun 6 14:06:30 2008
@@ -84,6 +84,9 @@
#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
#
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Fri Jun 6 14:06:30 2008
@@ -78,49 +78,12 @@
this.conf = conf;
- if (uri.getHost() == null) {
- throw new IllegalArgumentException("Invalid hostname in URI " + uri);
- }
-
+ S3Credentials s3Credentials = new S3Credentials();
+ s3Credentials.initialize(uri, conf);
try {
- String accessKey = null;
- String secretAccessKey = null;
- String userInfo = uri.getUserInfo();
- if (userInfo != null) {
- int index = userInfo.indexOf(':');
- if (index != -1) {
- accessKey = userInfo.substring(0, index);
- secretAccessKey = userInfo.substring(index + 1);
- } else {
- accessKey = userInfo;
- }
- }
- if (accessKey == null) {
- accessKey = conf.get("fs.s3.awsAccessKeyId");
- }
- if (secretAccessKey == null) {
- secretAccessKey = conf.get("fs.s3.awsSecretAccessKey");
- }
- if (accessKey == null && secretAccessKey == null) {
- throw new IllegalArgumentException("AWS " +
- "Access Key ID and Secret Access Key " +
- "must be specified as the username " +
- "or password (respectively) of a s3 URL, " +
- "or by setting the " +
- "fs.s3.awsAccessKeyId or " +
- "fs.s3.awsSecretAccessKey properties (respectively).");
- } else if (accessKey == null) {
- throw new IllegalArgumentException("AWS " +
- "Access Key ID must be specified " +
- "as the username of a s3 URL, or by setting the " +
- "fs.s3.awsAccessKeyId property.");
- } else if (secretAccessKey == null) {
- throw new IllegalArgumentException("AWS " +
- "Secret Access Key must be specified " +
- "as the password of a s3 URL, or by setting the " +
- "fs.s3.awsSecretAccessKey property.");
- }
- AWSCredentials awsCredentials = new AWSCredentials(accessKey, secretAccessKey);
+ AWSCredentials awsCredentials =
+ new AWSCredentials(s3Credentials.getAccessKey(),
+ s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3Credentials.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3Credentials.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3Credentials.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * Extracts AWS credentials from the filesystem URI or configuration.
+ * </p>
+ */
+public class S3Credentials {
+
+ private String accessKey;
+ private String secretAccessKey;
+
+ /**
+ * @throws IllegalArgumentException if credentials for S3 cannot be
+ * determined.
+ */
+ public void initialize(URI uri, Configuration conf) {
+ if (uri.getHost() == null) {
+ throw new IllegalArgumentException("Invalid hostname in URI " + uri);
+ }
+
+ String userInfo = uri.getUserInfo();
+ if (userInfo != null) {
+ int index = userInfo.indexOf(':');
+ if (index != -1) {
+ accessKey = userInfo.substring(0, index);
+ secretAccessKey = userInfo.substring(index + 1);
+ } else {
+ accessKey = userInfo;
+ }
+ }
+
+ String scheme = uri.getScheme();
+ String accessKeyProperty = String.format("fs.%s.awsAccessKeyId", scheme);
+ String secretAccessKeyProperty =
+ String.format("fs.%s.awsSecretAccessKey", scheme);
+ if (accessKey == null) {
+ accessKey = conf.get(accessKeyProperty);
+ }
+ if (secretAccessKey == null) {
+ secretAccessKey = conf.get(secretAccessKeyProperty);
+ }
+ if (accessKey == null && secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID and Secret Access " +
+ "Key must be specified as the " +
+ "username or password " +
+ "(respectively) of a " + scheme +
+ " URL, or by setting the " +
+ accessKeyProperty + " or " +
+ secretAccessKeyProperty +
+ " properties (respectively).");
+ } else if (accessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID must be specified " +
+ "as the username of a " + scheme +
+ " URL, or by setting the " +
+ accessKeyProperty + " property.");
+ } else if (secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Secret Access Key must be " +
+ "specified as the password of a " +
+ scheme + " URL, or by setting the " +
+ secretAccessKeyProperty +
+ " property.");
+ }
+
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getSecretAccessKey() {
+ return secretAccessKey;
+ }
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Fri Jun 6 14:06:30 2008
@@ -23,19 +23,18 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
@@ -43,20 +42,18 @@
/**
* <p>
- * A {@link FileSystem} backed by <a href="http://aws.amazon.com/s3">Amazon S3</a>.
+ * A block-based {@link FileSystem} backed by
+ * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
* </p>
+ * @see NativeS3FileSystem
*/
public class S3FileSystem extends FileSystem {
- private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
-
private URI uri;
private FileSystemStore store;
- private FileSystem localFs;
-
- private Path workingDir = new Path("/user", System.getProperty("user.name"));
+ private Path workingDir;
public S3FileSystem() {
// set store in initialize()
@@ -79,7 +76,8 @@
store.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- this.localFs = get(URI.create("file:///"), conf);
+ this.workingDir =
+ new Path("/user", System.getProperty("user.name")).makeQualified(this);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
@@ -131,15 +129,30 @@
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException {
Path absolutePath = makeAbsolute(path);
+ List<Path> paths = new ArrayList<Path>();
+ do {
+ paths.add(0, absolutePath);
+ absolutePath = absolutePath.getParent();
+ } while (absolutePath != null);
+
+ boolean result = true;
+ for (Path p : paths) {
+ result &= mkdir(p);
+ }
+ return result;
+ }
+
+ private boolean mkdir(Path path) throws IOException {
+ Path absolutePath = makeAbsolute(path);
INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
store.storeINode(absolutePath, INode.DIRECTORY_INODE);
} else if (inode.isFile()) {
throw new IOException(String.format(
- "Can't make directory for path %s since it is a file.", absolutePath));
+ "Can't make directory for path %s since it is a file.",
+ absolutePath));
}
- Path parent = absolutePath.getParent();
- return (parent == null || mkdirs(parent));
+ return true;
}
@Override
@@ -263,7 +276,10 @@
if (inode == null) {
return false;
}
- Path newDst = new Path(oldSrc.toString().replaceFirst(src.toString(), dst.toString()));
+ String oldSrcPath = oldSrc.toUri().getPath();
+ String srcPath = src.toUri().getPath();
+ String dstPath = dst.toUri().getPath();
+ Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
store.storeINode(newDst, inode);
store.deleteINode(oldSrc);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/package.html?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/package.html (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/package.html Fri Jun 6 14:06:30 2008
@@ -19,8 +19,9 @@
<body>
-<p>A distributed implementation of {@link
-org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
+<p>A distributed, block-based implementation of {@link
+org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>
+as a backing store.</p>
<p>
Files are stored in S3 as blocks (represented by
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/FileMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/FileMetadata.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/FileMetadata.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/FileMetadata.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+/**
+ * <p>
+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}.
+ * </p>
+ */
+class FileMetadata {
+ private final String key;
+ private final long length;
+ private final long lastModified;
+
+ public FileMetadata(String key, long length, long lastModified) {
+ this.key = key;
+ this.length = length;
+ this.lastModified = lastModified;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ @Override
+ public String toString() {
+ return "FileMetadata[" + key + ", " + length + ", " + lastModified + "]";
+ }
+
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3.S3Credentials;
+import org.apache.hadoop.fs.s3.S3Exception;
+import org.jets3t.service.S3ObjectsChunk;
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.security.AWSCredentials;
+
+class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
+
+ private S3Service s3Service;
+ private S3Bucket bucket;
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ S3Credentials s3Credentials = new S3Credentials();
+ s3Credentials.initialize(uri, conf);
+ try {
+ AWSCredentials awsCredentials =
+ new AWSCredentials(s3Credentials.getAccessKey(),
+ s3Credentials.getSecretAccessKey());
+ this.s3Service = new RestS3Service(awsCredentials);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ bucket = new S3Bucket(uri.getHost());
+
+ createBucket(bucket.getName());
+
+ }
+
+ private void createBucket(String bucketName) throws IOException {
+ try {
+ s3Service.createBucket(bucketName);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void storeFile(String key, File file, byte[] md5Hash)
+ throws IOException {
+
+ BufferedInputStream in = null;
+ try {
+ in = new BufferedInputStream(new FileInputStream(file));
+ S3Object object = new S3Object(key);
+ object.setDataInputStream(in);
+ object.setContentType("binary/octet-stream");
+ object.setContentLength(file.length());
+ if (md5Hash != null) {
+ object.setMd5Hash(md5Hash);
+ }
+ s3Service.putObject(bucket, object);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ public void storeEmptyFile(String key) throws IOException {
+ try {
+ S3Object object = new S3Object(key);
+ object.setDataInputStream(new ByteArrayInputStream(new byte[0]));
+ object.setContentType("binary/octet-stream");
+ object.setContentLength(0);
+ s3Service.putObject(bucket, object);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public FileMetadata retrieveMetadata(String key) throws IOException {
+ try {
+ S3Object object = s3Service.getObjectDetails(bucket, key);
+ return new FileMetadata(key, object.getContentLength(),
+ object.getLastModifiedDate().getTime());
+ } catch (S3ServiceException e) {
+ // Following is brittle. Is there a better way?
+ if (e.getMessage().contains("ResponseCode=404")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public InputStream retrieve(String key) throws IOException {
+ try {
+ S3Object object = s3Service.getObject(bucket, key);
+ return object.getDataInputStream();
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public InputStream retrieve(String key, long byteRangeStart)
+ throws IOException {
+ try {
+ S3Object object = s3Service.getObject(bucket, key, null, null, null,
+ null, byteRangeStart, null);
+ return object.getDataInputStream();
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public PartialListing list(String prefix, int maxListingLength)
+ throws IOException {
+ return list(prefix, maxListingLength, null);
+ }
+
+ public PartialListing list(String prefix, int maxListingLength,
+ String priorLastKey) throws IOException {
+ try {
+ if (prefix.length() > 0 &&
+ !prefix.endsWith(NativeS3FileSystem.PATH_DELIMITER)) {
+ prefix += NativeS3FileSystem.PATH_DELIMITER;
+ }
+ S3ObjectsChunk chunk = s3Service.listObjectsChunked(bucket.getName(),
+ prefix, NativeS3FileSystem.PATH_DELIMITER, maxListingLength,
+ priorLastKey);
+
+ FileMetadata[] fileMetadata =
+ new FileMetadata[chunk.getObjects().length];
+ for (int i = 0; i < fileMetadata.length; i++) {
+ S3Object object = chunk.getObjects()[i];
+ fileMetadata[i] = new FileMetadata(object.getKey(),
+ object.getContentLength(), object.getLastModifiedDate().getTime());
+ }
+ return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
+ chunk.getCommonPrefixes());
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void delete(String key) throws IOException {
+ try {
+ s3Service.deleteObject(bucket, key);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void purge(String prefix) throws IOException {
+ try {
+ S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
+ for (int i = 0; i < objects.length; i++) {
+ s3Service.deleteObject(bucket, objects[i].getKey());
+ }
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void dump() throws IOException {
+ StringBuilder sb = new StringBuilder("S3 Native Filesystem, ");
+ sb.append(bucket.getName()).append("\n");
+ try {
+ S3Object[] objects = s3Service.listObjects(bucket);
+ for (int i = 0; i < objects.length; i++) {
+ sb.append(objects[i].getKey()).append("\n");
+ }
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ System.out.println(sb);
+ }
+
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * An abstraction for a key-based {@link File} store.
+ * </p>
+ */
+interface NativeFileSystemStore {
+
+ void initialize(URI uri, Configuration conf) throws IOException;
+
+ void storeFile(String key, File file, byte[] md5Hash) throws IOException;
+ void storeEmptyFile(String key) throws IOException;
+
+ FileMetadata retrieveMetadata(String key) throws IOException;
+ InputStream retrieve(String key) throws IOException;
+ InputStream retrieve(String key, long byteRangeStart) throws IOException;
+
+ PartialListing list(String prefix, int maxListingLength) throws IOException;
+ PartialListing list(String prefix, int maxListingLength, String priorLastKey)
+ throws IOException;
+
+ void delete(String key) throws IOException;
+
+ /**
+ * Delete all keys with the given prefix. Used for testing.
+ * @throws IOException
+ */
+ void purge(String prefix) throws IOException;
+
+ /**
+ * Diagnostic method to dump state to the console.
+ * @throws IOException
+ */
+ void dump() throws IOException;
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3.S3Exception;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A {@link FileSystem} for reading and writing files stored on
+ * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
+ * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
+ * stores files on S3 in their
+ * native form so they can be read by other S3 tools.
+ * </p>
+ * @see org.apache.hadoop.fs.s3.S3FileSystem
+ */
+public class NativeS3FileSystem extends FileSystem {
+
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+
+ private static final String FOLDER_SUFFIX = "_$folder$";
+ private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
+ static final String PATH_DELIMITER = Path.SEPARATOR;
+ private static final int S3_MAX_LISTING_LENGTH = 1000;
+
+ private class NativeS3FsInputStream extends FSInputStream {
+
+ private InputStream in;
+ private final String key;
+ private long pos = 0;
+
+ public NativeS3FsInputStream(InputStream in, String key) {
+ this.in = in;
+ this.key = key;
+ }
+
+ public synchronized int read() throws IOException {
+ int result = in.read();
+ if (result > 0) {
+ pos += result;
+ }
+ return result;
+ }
+ public synchronized int read(byte[] b, int off, int len)
+ throws IOException {
+
+ int result = in.read(b, off, len);
+ if (result > 0) {
+ pos += result;
+ }
+ return result;
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public synchronized void seek(long pos) throws IOException {
+ in.close();
+ in = store.retrieve(key, pos);
+ this.pos = pos;
+ }
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ }
+
+ private class NativeS3FsOutputStream extends OutputStream {
+
+ private Configuration conf;
+ private String key;
+ private File backupFile;
+ private OutputStream backupStream;
+ private MessageDigest digest;
+ private boolean closed;
+
+ public NativeS3FsOutputStream(Configuration conf,
+ NativeFileSystemStore store, String key, Progressable progress,
+ int bufferSize) throws IOException {
+ this.conf = conf;
+ this.key = key;
+ this.backupFile = newBackupFile();
+ try {
+ this.digest = MessageDigest.getInstance("MD5");
+ this.backupStream = new BufferedOutputStream(new DigestOutputStream(
+ new FileOutputStream(backupFile), this.digest));
+ } catch (NoSuchAlgorithmException e) {
+ LOG.warn("Cannot load MD5 digest algorithm," +
+ "skipping message integrity check.", e);
+ this.backupStream = new BufferedOutputStream(
+ new FileOutputStream(backupFile));
+ }
+ }
+
+ private File newBackupFile() throws IOException {
+ File dir = new File(conf.get("fs.s3.buffer.dir"));
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Cannot create S3 buffer directory: " + dir);
+ }
+ File result = File.createTempFile("output-", ".tmp", dir);
+ result.deleteOnExit();
+ return result;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ backupStream.flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ backupStream.close();
+
+ try {
+ byte[] md5Hash = digest == null ? null : digest.digest();
+ store.storeFile(key, backupFile, md5Hash);
+ } finally {
+ if (!backupFile.delete()) {
+ LOG.warn("Could not delete temporary s3n file: " + backupFile);
+ }
+ super.close();
+ closed = true;
+ }
+
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ backupStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ backupStream.write(b, off, len);
+ }
+
+
+ }
+
+ private URI uri;
+ private NativeFileSystemStore store;
+ private Path workingDir;
+
+ public NativeS3FileSystem() {
+ // set store in initialize()
+ }
+
+ public NativeS3FileSystem(NativeFileSystemStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ if (store == null) {
+ store = createDefaultStore(conf);
+ }
+ store.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ this.workingDir =
+ new Path("/user", System.getProperty("user.name")).makeQualified(this);
+ }
+
+ private static NativeFileSystemStore createDefaultStore(Configuration conf) {
+ NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
+
+ RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ conf.getInt("fs.s3.maxRetries", 4),
+ conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(IOException.class, basePolicy);
+ exceptionToPolicyMap.put(S3Exception.class, basePolicy);
+
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map<String, RetryPolicy> methodNameToPolicyMap =
+ new HashMap<String, RetryPolicy>();
+ methodNameToPolicyMap.put("storeFile", methodPolicy);
+
+ return (NativeFileSystemStore)
+ RetryProxy.create(NativeFileSystemStore.class, store,
+ methodNameToPolicyMap);
+ }
+
+ private static String pathToKey(Path path) {
+ if (!path.isAbsolute()) {
+ throw new IllegalArgumentException("Path must be absolute: " + path);
+ }
+ return path.toUri().getPath().substring(1); // remove initial slash
+ }
+
+ private static Path keyToPath(String key) {
+ return new Path("/" + key);
+ }
+
+ private Path makeAbsolute(Path path) {
+ if (path.isAbsolute()) {
+ return path;
+ }
+ return new Path(workingDir, path);
+ }
+
+ /** This optional operation is not yet supported. */
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+
+ if (exists(f) && !overwrite) {
+ throw new IOException("File already exists:"+f);
+ }
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
+ key, progress, bufferSize), statistics);
+ }
+
+ @Override
+ @Deprecated
+ public boolean delete(Path path) throws IOException {
+ return delete(path, true);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ FileStatus status;
+ try {
+ status = getFileStatus(f);
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ if (status.isDir()) {
+ FileStatus[] contents = listStatus(f);
+ if (!recursive && contents.length > 0) {
+ throw new IOException("Directory " + f.toString() + " is not empty.");
+ }
+ for (FileStatus p : contents) {
+ if (!delete(p.getPath(), recursive)) {
+ return false;
+ }
+ }
+ store.delete(key + FOLDER_SUFFIX);
+ } else {
+ store.delete(key);
+ }
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+
+ if (key.length() == 0) { // root always exists
+ return newDirectory(absolutePath);
+ }
+
+ FileMetadata meta = store.retrieveMetadata(key);
+ if (meta != null) {
+ return newFile(meta, absolutePath);
+ }
+ if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
+ return newDirectory(absolutePath);
+ }
+
+ PartialListing listing = store.list(key, 1);
+ if (listing.getFiles().length > 0 ||
+ listing.getCommonPrefixes().length > 0) {
+ return newDirectory(absolutePath);
+ }
+
+ throw new FileNotFoundException(absolutePath +
+ ": No such file or directory.");
+
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ /**
+ * <p>
+ * If <code>f</code> is a file, this method will make a single call to S3.
+ * If <code>f</code> is a directory, this method will make a maximum of
+ * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
+ * files and directories contained directly in <code>f</code>.
+ * </p>
+ */
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+
+ if (key.length() > 0) {
+ FileMetadata meta = store.retrieveMetadata(key);
+ if (meta != null) {
+ return new FileStatus[] { newFile(meta, absolutePath) };
+ }
+ }
+
+ URI pathUri = absolutePath.toUri();
+ Set<FileStatus> status = new TreeSet<FileStatus>();
+ String priorLastKey = null;
+ do {
+ PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH,
+ priorLastKey);
+ for (FileMetadata fileMetadata : listing.getFiles()) {
+ Path subpath = keyToPath(fileMetadata.getKey());
+ String relativePath = pathUri.relativize(subpath.toUri()).getPath();
+ if (relativePath.endsWith(FOLDER_SUFFIX)) {
+ status.add(newDirectory(new Path(absolutePath,
+ relativePath.substring(0,
+ relativePath.indexOf(FOLDER_SUFFIX)))));
+ } else {
+ status.add(newFile(fileMetadata, subpath));
+ }
+ }
+ for (String commonPrefix : listing.getCommonPrefixes()) {
+ Path subpath = keyToPath(commonPrefix);
+ String relativePath = pathUri.relativize(subpath.toUri()).getPath();
+ status.add(newDirectory(new Path(absolutePath, relativePath)));
+ }
+ priorLastKey = listing.getPriorLastKey();
+ } while (priorLastKey != null);
+
+ if (status.isEmpty() &&
+ store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
+ return null;
+ }
+
+ return status.toArray(new FileStatus[0]);
+ }
+
+ private FileStatus newFile(FileMetadata meta, Path path) {
+ return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
+ meta.getLastModified(), path.makeQualified(this));
+ }
+
+ private FileStatus newDirectory(Path path) {
+ return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
+ path.makeQualified(this));
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ Path absolutePath = makeAbsolute(f);
+ List<Path> paths = new ArrayList<Path>();
+ do {
+ paths.add(0, absolutePath);
+ absolutePath = absolutePath.getParent();
+ } while (absolutePath != null);
+
+ boolean result = true;
+ for (Path path : paths) {
+ result &= mkdir(path);
+ }
+ return result;
+ }
+
+ private boolean mkdir(Path f) throws IOException {
+ try {
+ FileStatus fileStatus = getFileStatus(f);
+ if (!fileStatus.isDir()) {
+ throw new IOException(String.format(
+ "Can't make directory for path %s since it is a file.", f));
+
+ }
+ } catch (FileNotFoundException e) {
+ String key = pathToKey(f) + FOLDER_SUFFIX;
+ store.storeEmptyFile(key);
+ }
+ return true;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ if (!exists(f)) {
+ throw new FileNotFoundException(f.toString());
+ }
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ return new FSDataInputStream(new BufferedFSInputStream(
+ new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ /**
+ * Set the working directory to the given directory.
+ */
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ workingDir = newDir;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/PartialListing.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/PartialListing.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/PartialListing.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/PartialListing.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+/**
+ * <p>
+ * Holds information on a directory listing for a
+ * {@link NativeFileSystemStore}.
+ * This includes the {@link FileMetadata files} and directories
+ * (their names) contained in a directory.
+ * </p>
+ * <p>
+ * This listing may be returned in chunks, so a <code>priorLastKey</code>
+ * is provided so that the next chunk may be requested.
+ * </p>
+ * @see NativeFileSystemStore#list(String, int, String)
+ */
+class PartialListing {
+
+ private final String priorLastKey;
+ private final FileMetadata[] files;
+ private final String[] commonPrefixes;
+
+ public PartialListing(String priorLastKey, FileMetadata[] files,
+ String[] commonPrefixes) {
+ this.priorLastKey = priorLastKey;
+ this.files = files;
+ this.commonPrefixes = commonPrefixes;
+ }
+
+ public FileMetadata[] getFiles() {
+ return files;
+ }
+
+ public String[] getCommonPrefixes() {
+ return commonPrefixes;
+ }
+
+ public String getPriorLastKey() {
+ return priorLastKey;
+ }
+
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/package.html?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/package.html (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3native/package.html Fri Jun 6 14:06:30 2008
@@ -0,0 +1,32 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+
+<p>
+A distributed implementation of {@link
+org.apache.hadoop.fs.FileSystem} for reading and writing files on
+<a href="http://aws.amazon.com/s3">Amazon S3</a>.
+Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem}, which is block-based,
+this implementation stores
+files on S3 in their native form for interoperability with other S3 tools.
+</p>
+
+</body>
+</html>
Modified: hadoop/core/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=664126&r1=664125&r2=664126&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hadoop-site.xml (original)
+++ hadoop/core/trunk/src/test/hadoop-site.xml Fri Jun 6 14:06:30 2008
@@ -41,4 +41,10 @@
This is required by FTPFileSystem</description>
</property>
+<property>
+ <name>test.fs.s3n.name</name>
+ <value>s3n:///</value>
+ <description>The name of the s3n file system for testing.</description>
+</property>
+
</configuration>
Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSFileSystemContract.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSFileSystemContract.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHDFSFileSystemContract.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+
+public class TestHDFSFileSystemContract extends FileSystemContractBaseTest {
+
+ private MiniDFSCluster cluster;
+
+ @Override
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ fs = cluster.getFileSystem();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ cluster.shutdown();
+ }
+
+ @Override
+ protected boolean renameSupported() {
+ // disable for the moment as rename tests are not working on HDFS yet
+ return false;
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/FileSystemContractBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/FileSystemContractBaseTest.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/FileSystemContractBaseTest.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/FileSystemContractBaseTest.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * <p>
+ * A collection of tests for the contract of the {@link FileSystem}.
+ * This test should be used for general-purpose implementations of
+ * {@link FileSystem}, that is, implementations that provide implementations
+ * of all of the functionality of {@link FileSystem}.
+ * </p>
+ * <p>
+ * To test a given {@link FileSystem} implementation create a subclass of this
+ * test and override {@link #setUp()} to initialize the <code>fs</code>
+ * {@link FileSystem} instance variable.
+ * </p>
+ */
+public abstract class FileSystemContractBaseTest extends TestCase {
+
+ protected FileSystem fs;
+ private byte[] data = new byte[getBlockSize() * 2]; // two blocks of data
+ {
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) (i % 10);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ fs.delete(path("/test"), true);
+ }
+
+ protected int getBlockSize() {
+ return 1024;
+ }
+
+ protected String getDefaultWorkingDirectory() {
+ return "/user/" + System.getProperty("user.name");
+ }
+
+ protected boolean renameSupported() {
+ return true;
+ }
+
+ public void testWorkingDirectory() throws Exception {
+
+ Path workDir = path(getDefaultWorkingDirectory());
+ assertEquals(workDir, fs.getWorkingDirectory());
+
+ fs.setWorkingDirectory(path("."));
+ assertEquals(workDir, fs.getWorkingDirectory());
+
+ fs.setWorkingDirectory(path(".."));
+ assertEquals(workDir.getParent(), fs.getWorkingDirectory());
+
+ Path relativeDir = path("hadoop");
+ fs.setWorkingDirectory(relativeDir);
+ assertEquals(relativeDir, fs.getWorkingDirectory());
+
+ Path absoluteDir = path("/test/hadoop");
+ fs.setWorkingDirectory(absoluteDir);
+ assertEquals(absoluteDir, fs.getWorkingDirectory());
+
+ }
+
+ public void testMkdirs() throws Exception {
+ Path testDir = path("/test/hadoop");
+ assertFalse(fs.exists(testDir));
+ assertFalse(fs.isFile(testDir));
+
+ assertTrue(fs.mkdirs(testDir));
+
+ assertTrue(fs.exists(testDir));
+ assertFalse(fs.isFile(testDir));
+
+ assertTrue(fs.mkdirs(testDir));
+
+ assertTrue(fs.exists(testDir));
+ assertFalse(fs.isFile(testDir));
+
+ Path parentDir = testDir.getParent();
+ assertTrue(fs.exists(parentDir));
+ assertFalse(fs.isFile(parentDir));
+
+ Path grandparentDir = parentDir.getParent();
+ assertTrue(fs.exists(grandparentDir));
+ assertFalse(fs.isFile(grandparentDir));
+
+ }
+
+ public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
+ Path testDir = path("/test/hadoop");
+ assertFalse(fs.exists(testDir));
+ assertTrue(fs.mkdirs(testDir));
+ assertTrue(fs.exists(testDir));
+
+ createFile(path("/test/hadoop/file"));
+
+ Path testSubDir = path("/test/hadoop/file/subdir");
+ try {
+ fs.mkdirs(testSubDir);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // expected
+ }
+ assertFalse(fs.exists(testSubDir));
+
+ Path testDeepSubDir = path("/test/hadoop/file/deep/sub/dir");
+ try {
+ fs.mkdirs(testDeepSubDir);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // expected
+ }
+ assertFalse(fs.exists(testDeepSubDir));
+
+ }
+
+ public void testGetFileStatusThrowsExceptionForNonExistentFile()
+ throws Exception {
+ try {
+ fs.getFileStatus(path("/test/hadoop/file"));
+ fail("Should throw FileNotFoundException");
+ } catch (FileNotFoundException e) {
+ // expected
+ }
+ }
+
+ public void testListStatusReturnsNullForNonExistentFile() throws Exception {
+ assertNull(fs.listStatus(path("/test/hadoop/file")));
+ }
+
+ public void testListStatus() throws Exception {
+ Path[] testDirs = { path("/test/hadoop/a"),
+ path("/test/hadoop/b"),
+ path("/test/hadoop/c/1"), };
+ assertFalse(fs.exists(testDirs[0]));
+
+ for (Path path : testDirs) {
+ assertTrue(fs.mkdirs(path));
+ }
+
+ FileStatus[] paths = fs.listStatus(path("/test"));
+ assertEquals(1, paths.length);
+ assertEquals(path("/test/hadoop"), paths[0].getPath());
+
+ paths = fs.listStatus(path("/test/hadoop"));
+ assertEquals(3, paths.length);
+ assertEquals(path("/test/hadoop/a"), paths[0].getPath());
+ assertEquals(path("/test/hadoop/b"), paths[1].getPath());
+ assertEquals(path("/test/hadoop/c"), paths[2].getPath());
+
+ paths = fs.listStatus(path("/test/hadoop/a"));
+ assertEquals(0, paths.length);
+ }
+
+ public void testWriteReadAndDeleteEmptyFile() throws Exception {
+ writeReadAndDelete(0);
+ }
+
+ public void testWriteReadAndDeleteHalfABlock() throws Exception {
+ writeReadAndDelete(getBlockSize() / 2);
+ }
+
+ public void testWriteReadAndDeleteOneBlock() throws Exception {
+ writeReadAndDelete(getBlockSize());
+ }
+
+ public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+ writeReadAndDelete(getBlockSize() + (getBlockSize() / 2));
+ }
+
+ public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+ writeReadAndDelete(getBlockSize() * 2);
+ }
+
+ private void writeReadAndDelete(int len) throws IOException {
+ Path path = path("/test/hadoop/file");
+
+ fs.mkdirs(path.getParent());
+
+ FSDataOutputStream out = fs.create(path, false,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ (short) 1, getBlockSize());
+ out.write(data, 0, len);
+ out.close();
+
+ assertTrue("Exists", fs.exists(path));
+ assertEquals("Length", len, fs.getFileStatus(path).getLen());
+
+ FSDataInputStream in = fs.open(path);
+ byte[] buf = new byte[len];
+ in.readFully(0, buf);
+ in.close();
+
+ assertEquals(len, buf.length);
+ for (int i = 0; i < buf.length; i++) {
+ assertEquals("Position " + i, data[i], buf[i]);
+ }
+
+ assertTrue("Deleted", fs.delete(path, false));
+
+ assertFalse("No longer exists", fs.exists(path));
+
+ }
+
+ public void testOverwrite() throws IOException {
+ Path path = path("/test/hadoop/file");
+
+ fs.mkdirs(path.getParent());
+
+ createFile(path);
+
+ assertTrue("Exists", fs.exists(path));
+ assertEquals("Length", data.length, fs.getFileStatus(path).getLen());
+
+ try {
+ fs.create(path, false);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ FSDataOutputStream out = fs.create(path, true);
+ out.write(data, 0, data.length);
+ out.close();
+
+ assertTrue("Exists", fs.exists(path));
+ assertEquals("Length", data.length, fs.getFileStatus(path).getLen());
+
+ }
+
+ public void testWriteInNonExistentDirectory() throws IOException {
+ Path path = path("/test/hadoop/file");
+ assertFalse("Parent doesn't exist", fs.exists(path.getParent()));
+ createFile(path);
+
+ assertTrue("Exists", fs.exists(path));
+ assertEquals("Length", data.length, fs.getFileStatus(path).getLen());
+ assertTrue("Parent exists", fs.exists(path.getParent()));
+ }
+
+ public void testDeleteNonExistentFile() throws IOException {
+ Path path = path("/test/hadoop/file");
+ assertFalse("Doesn't exist", fs.exists(path));
+ assertFalse("No deletion", fs.delete(path, true));
+ }
+
+ public void testDeleteRecursively() throws IOException {
+ Path dir = path("/test/hadoop");
+ Path file = path("/test/hadoop/file");
+ Path subdir = path("/test/hadoop/subdir");
+
+ createFile(file);
+ assertTrue("Created subdir", fs.mkdirs(subdir));
+
+ assertTrue("File exists", fs.exists(file));
+ assertTrue("Dir exists", fs.exists(dir));
+ assertTrue("Subdir exists", fs.exists(subdir));
+
+ try {
+ fs.delete(dir, false);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // expected
+ }
+ assertTrue("File still exists", fs.exists(file));
+ assertTrue("Dir still exists", fs.exists(dir));
+ assertTrue("Subdir still exists", fs.exists(subdir));
+
+ assertTrue("Deleted", fs.delete(dir, true));
+ assertFalse("File doesn't exist", fs.exists(file));
+ assertFalse("Dir doesn't exist", fs.exists(dir));
+ assertFalse("Subdir doesn't exist", fs.exists(subdir));
+ }
+
+ public void testDeleteEmptyDirectory() throws IOException {
+ Path dir = path("/test/hadoop");
+ assertTrue(fs.mkdirs(dir));
+ assertTrue("Dir exists", fs.exists(dir));
+ assertTrue("Deleted", fs.delete(dir, false));
+ assertFalse("Dir doesn't exist", fs.exists(dir));
+ }
+
+ public void testRenameNonExistentPath() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/path");
+ Path dst = path("/test/new/newpath");
+ rename(src, dst, false, false, false);
+ }
+
+ public void testRenameFileMoveToNonExistentDirectory() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/file");
+ createFile(src);
+ Path dst = path("/test/new/newfile");
+ rename(src, dst, false, true, false);
+ }
+
+ public void testRenameFileMoveToExistingDirectory() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/file");
+ createFile(src);
+ Path dst = path("/test/new/newfile");
+ fs.mkdirs(dst.getParent());
+ rename(src, dst, true, false, true);
+ }
+
+ public void testRenameFileAsExistingFile() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/file");
+ createFile(src);
+ Path dst = path("/test/new/newfile");
+ createFile(dst);
+ rename(src, dst, false, true, true);
+ }
+
+ public void testRenameFileAsExistingDirectory() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/file");
+ createFile(src);
+ Path dst = path("/test/new/newdir");
+ fs.mkdirs(dst);
+ rename(src, dst, true, false, true);
+ assertTrue("Destination changed",
+ fs.exists(path("/test/new/newdir/file")));
+ }
+
+ public void testRenameDirectoryMoveToNonExistentDirectory()
+ throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/dir");
+ fs.mkdirs(src);
+ Path dst = path("/test/new/newdir");
+ rename(src, dst, false, true, false);
+ }
+
+ public void testRenameDirectoryMoveToExistingDirectory() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/dir");
+ fs.mkdirs(src);
+ createFile(path("/test/hadoop/dir/file1"));
+ createFile(path("/test/hadoop/dir/subdir/file2"));
+
+ Path dst = path("/test/new/newdir");
+ fs.mkdirs(dst.getParent());
+ rename(src, dst, true, false, true);
+
+ assertFalse("Nested file1 exists",
+ fs.exists(path("/test/hadoop/dir/file1")));
+ assertFalse("Nested file2 exists",
+ fs.exists(path("/test/hadoop/dir/subdir/file2")));
+ assertTrue("Renamed nested file1 exists",
+ fs.exists(path("/test/new/newdir/file1")));
+ assertTrue("Renamed nested exists",
+ fs.exists(path("/test/new/newdir/subdir/file2")));
+ }
+
+ public void testRenameDirectoryAsExistingFile() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/dir");
+ fs.mkdirs(src);
+ Path dst = path("/test/new/newfile");
+ createFile(dst);
+ rename(src, dst, false, true, true);
+ }
+
+ public void testRenameDirectoryAsExistingDirectory() throws Exception {
+ if (!renameSupported()) return;
+
+ Path src = path("/test/hadoop/dir");
+ fs.mkdirs(src);
+ createFile(path("/test/hadoop/dir/file1"));
+ createFile(path("/test/hadoop/dir/subdir/file2"));
+
+ Path dst = path("/test/new/newdir");
+ fs.mkdirs(dst);
+ rename(src, dst, true, false, true);
+ assertTrue("Destination changed",
+ fs.exists(path("/test/new/newdir/dir")));
+ assertFalse("Nested file1 exists",
+ fs.exists(path("/test/hadoop/dir/file1")));
+ assertFalse("Nested file2 exists",
+ fs.exists(path("/test/hadoop/dir/subdir/file2")));
+ assertTrue("Renamed nested file1 exists",
+ fs.exists(path("/test/new/newdir/dir/file1")));
+ assertTrue("Renamed nested exists",
+ fs.exists(path("/test/new/newdir/dir/subdir/file2")));
+ }
+
+ protected Path path(String pathString) {
+ return new Path(pathString).makeQualified(fs);
+ }
+
+ protected void createFile(Path path) throws IOException {
+ FSDataOutputStream out = fs.create(path);
+ out.write(data, 0, data.length);
+ out.close();
+ }
+
+ private void rename(Path src, Path dst, boolean renameSucceeded,
+ boolean srcExists, boolean dstExists) throws IOException {
+ assertEquals("Rename result", renameSucceeded, fs.rename(src, dst));
+ assertEquals("Source exists", srcExists, fs.exists(src));
+ assertEquals("Destination exists", dstExists, fs.exists(dst));
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+public class Jets3tS3FileSystemContractTest
+ extends S3FileSystemContractBaseTest {
+
+ @Override
+ FileSystemStore getFileSystemStore() throws IOException {
+ return new Jets3tFileSystemStore();
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+
+public abstract class S3FileSystemContractBaseTest
+ extends FileSystemContractBaseTest {
+
+ private FileSystemStore store;
+
+ abstract FileSystemStore getFileSystemStore() throws IOException;
+
+ @Override
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ store = getFileSystemStore();
+ fs = new S3FileSystem(store);
+ fs.initialize(URI.create(conf.get("test.fs.s3.name")), conf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ store.purge();
+ super.tearDown();
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+public class TestInMemoryS3FileSystemContract
+ extends S3FileSystemContractBaseTest {
+
+ @Override
+ FileSystemStore getFileSystemStore() throws IOException {
+ return new InMemoryFileSystemStore();
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Credentials.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Credentials.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Credentials.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3Credentials.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestS3Credentials extends TestCase {
+ public void testInvalidHostnameWithUnderscores() throws Exception {
+ S3Credentials s3Credentials = new S3Credentials();
+ try {
+ s3Credentials.initialize(new URI("s3://a:b@c_d"), new Configuration());
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Invalid hostname in URI s3://a:b@c_d", e.getMessage());
+ }
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3FileSystem.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3FileSystem.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3/TestS3FileSystem.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestS3FileSystem extends TestCase {
+
+ public void testInitialization() throws IOException {
+ initializationTest("s3://a:b@c", "s3://a:b@c");
+ initializationTest("s3://a:b@c/", "s3://a:b@c");
+ initializationTest("s3://a:b@c/path", "s3://a:b@c");
+ initializationTest("s3://a@c", "s3://a@c");
+ initializationTest("s3://a@c/", "s3://a@c");
+ initializationTest("s3://a@c/path", "s3://a@c");
+ initializationTest("s3://c", "s3://c");
+ initializationTest("s3://c/", "s3://c");
+ initializationTest("s3://c/path", "s3://c");
+ }
+
+ private void initializationTest(String initializationUri, String expectedUri)
+ throws IOException {
+
+ S3FileSystem fs = new S3FileSystem(new InMemoryFileSystemStore());
+ fs.initialize(URI.create(initializationUri), new Configuration());
+ assertEquals(URI.create(expectedUri), fs.getUri());
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * <p>
+ * A stub implementation of {@link NativeFileSystemStore} for testing
+ * {@link NativeS3FileSystem} without actually connecting to S3.
+ * </p>
+ */
+class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
+
+ private Configuration conf;
+
+ private SortedMap<String, FileMetadata> metadataMap =
+ new TreeMap<String, FileMetadata>();
+ private SortedMap<String, byte[]> dataMap = new TreeMap<String, byte[]>();
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ this.conf = conf;
+ }
+
+ public void storeEmptyFile(String key) throws IOException {
+ metadataMap.put(key, new FileMetadata(key, 0, System.currentTimeMillis()));
+ dataMap.put(key, new byte[0]);
+ }
+
+ public void storeFile(String key, File file, byte[] md5Hash)
+ throws IOException {
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buf = new byte[8192];
+ int numRead;
+ BufferedInputStream in = null;
+ try {
+ in = new BufferedInputStream(new FileInputStream(file));
+ while ((numRead = in.read(buf)) >= 0) {
+ out.write(buf, 0, numRead);
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ metadataMap.put(key,
+ new FileMetadata(key, file.length(), System.currentTimeMillis()));
+ dataMap.put(key, out.toByteArray());
+ }
+
+ public InputStream retrieve(String key) throws IOException {
+ return retrieve(key, 0);
+ }
+
+ public InputStream retrieve(String key, long byteRangeStart)
+ throws IOException {
+
+ byte[] data = dataMap.get(key);
+ File file = createTempFile();
+ BufferedOutputStream out = null;
+ try {
+ out = new BufferedOutputStream(new FileOutputStream(file));
+ out.write(data, (int) byteRangeStart,
+ data.length - (int) byteRangeStart);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ return new FileInputStream(file);
+ }
+
+ private File createTempFile() throws IOException {
+ File dir = new File(conf.get("fs.s3.buffer.dir"));
+ if (!dir.exists() && !dir.mkdirs()) {
+ throw new IOException("Cannot create S3 buffer directory: " + dir);
+ }
+ File result = File.createTempFile("test-", ".tmp", dir);
+ result.deleteOnExit();
+ return result;
+ }
+
+ public FileMetadata retrieveMetadata(String key) throws IOException {
+ return metadataMap.get(key);
+ }
+
+ public PartialListing list(String prefix, int maxListingLength)
+ throws IOException {
+ return list(prefix, maxListingLength, null);
+ }
+
+ public PartialListing list(String prefix, int maxListingLength,
+ String priorLastKey) throws IOException {
+
+ if (prefix.length() > 0 &&
+ !prefix.endsWith(NativeS3FileSystem.PATH_DELIMITER)) {
+ prefix += NativeS3FileSystem.PATH_DELIMITER;
+ }
+
+ List<FileMetadata> metadata = new ArrayList<FileMetadata>();
+ SortedSet<String> commonPrefixes = new TreeSet<String>();
+ for (String key : dataMap.keySet()) {
+ if (key.startsWith(prefix)) {
+ int delimIndex = key.indexOf(NativeS3FileSystem.PATH_DELIMITER,
+ prefix.length());
+ if (delimIndex == -1) {
+ metadata.add(retrieveMetadata(key));
+ } else {
+ String commonPrefix = key.substring(0, delimIndex);
+ commonPrefixes.add(commonPrefix);
+ }
+ }
+ if (metadata.size() + commonPrefixes.size() == maxListingLength) {
+ new PartialListing(key, metadata.toArray(new FileMetadata[0]),
+ commonPrefixes.toArray(new String[0]));
+ }
+ }
+ return new PartialListing(null, metadata.toArray(new FileMetadata[0]),
+ commonPrefixes.toArray(new String[0]));
+ }
+
+ public void delete(String key) throws IOException {
+ metadataMap.remove(key);
+ dataMap.remove(key);
+ }
+
+ public void purge(String prefix) throws IOException {
+ Iterator<Entry<String, FileMetadata>> i =
+ metadataMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Entry<String, FileMetadata> entry = i.next();
+ if (entry.getKey().startsWith(prefix)) {
+ dataMap.remove(entry.getKey());
+ i.remove();
+ }
+ }
+ }
+
+ public void dump() throws IOException {
+ System.out.println(metadataMap.values());
+ System.out.println(dataMap.keySet());
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.IOException;
+
+public class Jets3tNativeS3FileSystemContractTest
+ extends NativeS3FileSystemContractBaseTest {
+
+ @Override
+ NativeFileSystemStore getNativeFileSystemStore() throws IOException {
+ return new Jets3tNativeFileSystemStore();
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java?rev=664126&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java Fri Jun 6 14:06:30 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3native;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+
+public abstract class NativeS3FileSystemContractBaseTest
+ extends FileSystemContractBaseTest {
+
+ private NativeFileSystemStore store;
+
+ abstract NativeFileSystemStore getNativeFileSystemStore() throws IOException;
+
+ @Override
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ store = getNativeFileSystemStore();
+ fs = new NativeS3FileSystem(store);
+ fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ store.purge("test");
+ super.tearDown();
+ }
+
+ @Override
+ protected boolean renameSupported() {
+ return false;
+ }
+
+ public void testListStatusForRoot() throws Exception {
+ Path testDir = path("/test");
+ assertTrue(fs.mkdirs(testDir));
+
+ FileStatus[] paths = fs.listStatus(path("/"));
+ assertEquals(1, paths.length);
+ assertEquals(path("/test"), paths[0].getPath());
+ }
+
+}