You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/14 00:12:09 UTC
svn commit: r486886 - in /lucene/hadoop/trunk: ./ conf/ lib/
src/java/org/apache/hadoop/fs/s3/ src/test/ src/test/org/apache/hadoop/fs/s3/
Author: cutting
Date: Wed Dec 13 15:12:07 2006
New Revision: 486886
URL: http://svn.apache.org/viewvc?view=rev&rev=486886
Log:
HADOOP-574. Add an Amazon S3 FileSystem implementation. Contributed by Tom White.
Added:
lucene/hadoop/trunk/lib/commons-codec-1.3.jar (with props)
lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar (with props)
lucene/hadoop/trunk/lib/jets3t.jar (with props)
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/test/hadoop-site.xml
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 13 15:12:07 2006
@@ -92,6 +92,13 @@
26. HADOOP-454. Add a 'dfs -dus' command that provides summary disk
usage. (Hairong Kuang via cutting)
+27. HADOOP-574. Add an Amazon S3 implementation of FileSystem. To
+ use this, one need only specify paths of the form
+ s3://id:secret@bucket/. Alternately, the AWS access key id and
+ secret can be specified in your config, with the properties
+ fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey.
+ (Tom White via cutting)
+
Release 0.9.1 - 2006-12-06
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Dec 13 15:12:07 2006
@@ -119,6 +119,12 @@
</property>
<property>
+ <name>fs.s3.impl</name>
+ <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
+ <description>The FileSystem for s3: uris.</description>
+</property>
+
+<property>
<name>dfs.datanode.bindAddress</name>
<value>0.0.0.0</value>
<description>
Added: lucene/hadoop/trunk/lib/commons-codec-1.3.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-codec-1.3.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/lib/commons-codec-1.3.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/hadoop/trunk/lib/jets3t.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/jets3t.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/lib/jets3t.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,26 @@
+package org.apache.hadoop.fs.s3;
+
+class Block {
+ private long id;
+
+ private long length;
+
+ public Block(long id, long length) {
+ this.id = id;
+ this.length = length;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return "Block[" + id + ", " + length + "]";
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,40 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+interface FileSystemStore {
+
+ void initialize(URI uri, Configuration conf) throws IOException;
+
+ void storeINode(Path path, INode inode) throws IOException;
+ void storeBlock(Block block, InputStream in) throws IOException;
+
+ boolean inodeExists(Path path) throws IOException;
+ boolean blockExists(long blockId) throws IOException;
+
+ INode getINode(Path path) throws IOException;
+ InputStream getBlockStream(Block block, long byteRangeStart) throws IOException;
+
+ void deleteINode(Path path) throws IOException;
+ void deleteBlock(Block block) throws IOException;
+
+ Set<Path> listSubPaths(Path path) throws IOException;
+
+ /**
+ * Delete everything. Used for testing.
+ * @throws IOException
+ */
+ void purge() throws IOException;
+
+ /**
+ * Diagnostic method to dump all INodes to the console.
+ * @throws IOException
+ */
+ void dump() throws IOException;
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,99 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Holds file metadata including type (regular file, or directory),
+ * and the list of blocks that are pointers to the data.
+ */
+class INode {
+
+ enum FileType {
+ DIRECTORY, FILE
+ }
+
+ public static final FileType[] FILE_TYPES = {
+ FileType.DIRECTORY,
+ FileType.FILE
+ };
+
+ public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
+
+ private FileType fileType;
+ private Block[] blocks;
+
+ public INode(FileType fileType, Block[] blocks) {
+ this.fileType = fileType;
+ if (isDirectory() && blocks != null) {
+ throw new IllegalArgumentException("A directory cannot contain blocks.");
+ }
+ this.blocks = blocks;
+ }
+
+ public Block[] getBlocks() {
+ return blocks;
+ }
+
+ public FileType getFileType() {
+ return fileType;
+ }
+
+ public boolean isDirectory() {
+ return fileType == FileType.DIRECTORY;
+ }
+
+ public boolean isFile() {
+ return fileType == FileType.FILE;
+ }
+
+ public long getSerializedLength() {
+ return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
+ }
+
+
+ public InputStream serialize() throws IOException {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bytes);
+ out.writeByte(fileType.ordinal());
+ if (isFile()) {
+ out.writeInt(blocks.length);
+ for (int i = 0; i < blocks.length; i++) {
+ out.writeLong(blocks[i].getId());
+ out.writeLong(blocks[i].getLength());
+ }
+ }
+ out.close();
+ return new ByteArrayInputStream(bytes.toByteArray());
+ }
+
+ public static INode deserialize(InputStream in) throws IOException {
+ if (in == null) {
+ return null;
+ }
+ DataInputStream dataIn = new DataInputStream(in);
+ FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
+ switch (fileType) {
+ case DIRECTORY:
+ in.close();
+ return INode.DIRECTORY_INODE;
+ case FILE:
+ int numBlocks = dataIn.readInt();
+ Block[] blocks = new Block[numBlocks];
+ for (int i = 0; i < numBlocks; i++) {
+ long id = dataIn.readLong();
+ long length = dataIn.readLong();
+ blocks[i] = new Block(id, length);
+ }
+ in.close();
+ return new INode(fileType, blocks);
+ default:
+ throw new IllegalArgumentException("Cannot deserialize inode.");
+ }
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,296 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.security.AWSCredentials;
+
+class Jets3tFileSystemStore implements FileSystemStore {
+
+ private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR);
+ private static final String BLOCK_PREFIX = "block_";
+
+ private S3Service s3Service;
+
+ private S3Bucket bucket;
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ try {
+ String accessKey = null;
+ String secretAccessKey = null;
+ String userInfo = uri.getUserInfo();
+ if (userInfo != null) {
+ int index = userInfo.indexOf(':');
+ if (index != -1) {
+ accessKey = userInfo.substring(0, index);
+ secretAccessKey = userInfo.substring(index + 1);
+ } else {
+ accessKey = userInfo;
+ }
+ }
+ if (accessKey == null) {
+ accessKey = conf.get("fs.s3.awsAccessKeyId");
+ }
+ if (secretAccessKey == null) {
+ secretAccessKey = conf.get("fs.s3.awsSecretAccessKey");
+ }
+ if (accessKey == null && secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID and Secret Access Key " +
+ "must be specified as the username " +
+ "or password (respectively) of a s3 URL, " +
+ "or by setting the " +
+ "fs.s3.awsAccessKeyId or " +
+ "fs.s3.awsSecretAccessKey properties (respectively).");
+ } else if (accessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID must be specified " +
+ "as the username of a s3 URL, or by setting the " +
+ "fs.s3.awsAccessKeyId property.");
+ } else if (secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Secret Access Key must be specified " +
+ "as the password of a s3 URL, or by setting the " +
+ "fs.s3.awsSecretAccessKey property.");
+ }
+ AWSCredentials awsCredentials = new AWSCredentials(accessKey, secretAccessKey);
+ this.s3Service = new RestS3Service(awsCredentials);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ bucket = new S3Bucket(uri.getHost());
+
+ createBucket(bucket.getName());
+ }
+
+ private void createBucket(String bucketName) throws IOException {
+ try {
+ s3Service.createBucket(bucketName);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ private void delete(String key) throws IOException {
+ try {
+ s3Service.deleteObject(bucket, key);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void deleteINode(Path path) throws IOException {
+ delete(pathToKey(path));
+ }
+
+ public void deleteBlock(Block block) throws IOException {
+ delete(blockToKey(block));
+ }
+
+ public boolean inodeExists(Path path) throws IOException {
+ InputStream in = get(pathToKey(path));
+ if (in == null) {
+ return false;
+ }
+ in.close();
+ return true;
+ }
+
+ public boolean blockExists(long blockId) throws IOException {
+ InputStream in = get(blockToKey(blockId));
+ if (in == null) {
+ return false;
+ }
+ in.close();
+ return true;
+ }
+
+ private InputStream get(String key) throws IOException {
+ try {
+ S3Object object = s3Service.getObject(bucket, key);
+ return object.getDataInputStream();
+ } catch (S3ServiceException e) {
+ if (e.getErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ private InputStream get(String key, long byteRangeStart) throws IOException {
+ try {
+ S3Object object = s3Service.getObject(bucket, key, null, null, null,
+ null, byteRangeStart, null);
+ return object.getDataInputStream();
+ } catch (S3ServiceException e) {
+ if (e.getErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public INode getINode(Path path) throws IOException {
+ return INode.deserialize(get(pathToKey(path)));
+ }
+
+ public InputStream getBlockStream(Block block, long byteRangeStart)
+ throws IOException {
+ return get(blockToKey(block), byteRangeStart);
+ }
+
+ public Set<Path> listSubPaths(Path path) throws IOException {
+ try {
+ String prefix = pathToKey(path);
+ if (!prefix.endsWith(PATH_DELIMITER)) {
+ prefix += PATH_DELIMITER;
+ }
+ S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER, 0);
+ Set<Path> prefixes = new TreeSet<Path>();
+ for (int i = 0; i < objects.length; i++) {
+ prefixes.add(keyToPath(objects[i].getKey()));
+ }
+ prefixes.remove(path);
+ return prefixes;
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ private void put(String key, InputStream in, long length) throws IOException {
+ try {
+ S3Object object = new S3Object(key);
+ object.setDataInputStream(in);
+ object.setContentType("binary/octet-stream");
+ object.setContentLength(length);
+ s3Service.putObject(bucket, object);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void storeINode(Path path, INode inode) throws IOException {
+ put(pathToKey(path), inode.serialize(), inode.getSerializedLength());
+ }
+
+ public void storeBlock(Block block, InputStream in) throws IOException {
+ put(blockToKey(block), in, block.getLength());
+ }
+
+ private String pathToKey(Path path) {
+ if (!path.isAbsolute()) {
+ throw new IllegalArgumentException("Path must be absolute: " + path);
+ }
+ return urlEncode(path.toString());
+ }
+
+ private Path keyToPath(String key) {
+ return new Path(urlDecode(key));
+ }
+
+ private static String urlEncode(String s) {
+ try {
+ return URLEncoder.encode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Should never happen since every implementation of the Java Platform
+ // is required to support UTF-8.
+ // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static String urlDecode(String s) {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Should never happen since every implementation of the Java Platform
+ // is required to support UTF-8.
+ // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private String blockToKey(long blockId) {
+ return BLOCK_PREFIX + blockId;
+ }
+
+ private String blockToKey(Block block) {
+ return blockToKey(block.getId());
+ }
+
+ public void purge() throws IOException {
+ try {
+ S3Object[] objects = s3Service.listObjects(bucket);
+ for (int i = 0; i < objects.length; i++) {
+ s3Service.deleteObject(bucket, objects[i].getKey());
+ }
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void dump() throws IOException {
+ StringBuilder sb = new StringBuilder("S3 Filesystem, ");
+ sb.append(bucket.getName()).append("\n");
+ try {
+ S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
+ for (int i = 0; i < objects.length; i++) {
+ Path path = keyToPath(objects[i].getKey());
+ sb.append(path).append("\n");
+ INode m = getINode(path);
+ sb.append("\t").append(m.getFileType()).append("\n");
+ if (m.getFileType() == FileType.DIRECTORY) {
+ continue;
+ }
+ for (int j = 0; j < m.getBlocks().length; j++) {
+ sb.append("\t").append(m.getBlocks()[j]).append("\n");
+ }
+ }
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ System.out.println(sb);
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,12 @@
+package org.apache.hadoop.fs.s3;
+
+/**
+ * Thrown if there is a problem communicating with Amazon S3.
+ */
+public class S3Exception extends RuntimeException {
+
+ public S3Exception(Throwable t) {
+ super(t);
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,320 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://aws.amazon.com/s3">Amazon S3</a>.
+ * </p>
+ * @author Tom White
+ */
+public class S3FileSystem extends FileSystem {
+
+ private static final long DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024;
+
+ private URI uri;
+
+ private FileSystemStore store;
+
+ private FileSystem localFs;
+
+ private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+ public S3FileSystem() {
+ this(new Jets3tFileSystemStore());
+ }
+
+ public S3FileSystem(FileSystemStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ store.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ this.localFs = get(URI.create("file:///"), conf);
+ }
+
+ @Override
+ public String getName() {
+ return getUri().toString();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ workingDir = makeAbsolute(dir);
+ }
+
+ private Path makeAbsolute(Path path) {
+ if (path.isAbsolute()) {
+ return path;
+ }
+ return new Path(workingDir, path);
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return store.inodeExists(makeAbsolute(path));
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ INode inode = store.getINode(absolutePath);
+ if (inode == null) {
+ store.storeINode(path, INode.DIRECTORY_INODE);
+ } else if (inode.isFile()) {
+ throw new IOException(String.format(
+ "Can't make directory for path %s since it is a file.", path));
+ }
+ Path parent = path.getParent();
+ return (parent == null || mkdirs(parent));
+ }
+
+ @Override
+ public boolean isDirectory(Path path) throws IOException {
+ INode inode = store.getINode(makeAbsolute(path));
+ if (inode == null) {
+ return false;
+ }
+ return inode.isDirectory();
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ INode inode = store.getINode(makeAbsolute(path));
+ if (inode == null) {
+ return false;
+ }
+ return inode.isFile();
+ }
+
+ private INode checkFile(Path path) throws IOException {
+ INode inode = store.getINode(makeAbsolute(path));
+ if (inode == null) {
+ throw new IOException("No such file.");
+ }
+ if (inode.isDirectory()) {
+ throw new IOException("Path " + path + " is a directory.");
+ }
+ return inode;
+ }
+
+ @Override
+ public Path[] listPathsRaw(Path path) throws IOException {
+ INode inode = store.getINode(makeAbsolute(path));
+ if (inode == null) {
+ return null;
+ } else if (inode.isFile()) {
+ return new Path[] { path };
+ } else { // directory
+ Set<Path> paths = store.listSubPaths(path);
+ return paths.toArray(new Path[0]);
+ }
+ }
+
+ @Override
+ public FSOutputStream createRaw(Path file, boolean overwrite,
+ short replication, long blockSize) throws IOException {
+
+ return createRaw(file, overwrite, replication, blockSize, null);
+ }
+
+ @Override
+ public FSOutputStream createRaw(Path file, boolean overwrite,
+ short replication, long blockSize, Progressable progress)
+ throws IOException {
+
+ if (!isDirectory(file.getParent())) {
+ throw new IOException("Cannot create file " + file
+ + " since parent directory does not exist.");
+ }
+ INode inode = store.getINode(makeAbsolute(file));
+ if (inode != null) {
+ if (overwrite) {
+ deleteRaw(file);
+ } else {
+ throw new IOException("File already exists: " + file);
+ }
+ }
+ return new S3OutputStream(getConf(), store, makeAbsolute(file),
+ blockSize, progress);
+ }
+
+ @Override
+ public FSInputStream openRaw(Path path) throws IOException {
+ INode inode = checkFile(path);
+ return new S3InputStream(getConf(), store, inode);
+ }
+
+ @Override
+ public boolean renameRaw(Path src, Path dst) throws IOException {
+ // TODO: Check corner cases: dst already exists,
+ // or if path is directory with children
+ Path absoluteSrc = makeAbsolute(src);
+ INode inode = store.getINode(absoluteSrc);
+ if (inode == null) {
+ throw new IOException("No such file.");
+ }
+ store.storeINode(makeAbsolute(dst), inode);
+ store.deleteINode(absoluteSrc);
+ return true;
+ }
+
+ @Override
+ public boolean deleteRaw(Path path) throws IOException {
+ // TODO: Check if path is directory with children
+ Path absolutePath = makeAbsolute(path);
+ INode inode = store.getINode(absolutePath);
+ if (inode == null) {
+ throw new IOException("No such file or directory.");
+ }
+ store.deleteINode(absolutePath);
+ if (inode.isFile()) {
+ for (Block block : inode.getBlocks()) {
+ store.deleteBlock(block);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public long getLength(Path path) throws IOException {
+ INode inode = checkFile(path);
+ long length = 0;
+ for (Block block : inode.getBlocks()) {
+ length += block.getLength();
+ }
+ return length;
+ }
+
+ /**
+ * Replication is not supported for S3 file systems since S3 handles it for
+ * us.
+ */
+ @Override
+ public short getReplication(Path path) throws IOException {
+ return 1;
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ /**
+ * Replication is not supported for S3 file systems since S3 handles it for
+ * us.
+ */
+ @Override
+ public boolean setReplicationRaw(Path path, short replication)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public long getBlockSize(Path path) throws IOException {
+ INode inode = store.getINode(makeAbsolute(path));
+ if (inode == null) {
+ throw new IOException("No such file or directory.");
+ }
+ Block[] blocks = inode.getBlocks();
+ if (blocks == null || blocks.length == 0) {
+ return 0;
+ }
+ return blocks[0].getLength();
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLong("fs.s3.block.size", DEFAULT_BLOCK_SIZE);
+ }
+
+ /**
+ * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+ */
+ @Override
+ public String[][] getFileCacheHints(Path f, long start, long len)
+ throws IOException {
+ // TODO: Check this is the correct behavior
+ if (!exists(f)) {
+ return null;
+ }
+ return new String[][] { { "localhost" } };
+ }
+
+ @Override
+ public void lock(Path path, boolean shared) throws IOException {
+ // TODO: Design and implement
+ }
+
+ @Override
+ public void release(Path path) throws IOException {
+ // TODO: Design and implement
+ }
+
+ @Override
+ public void reportChecksumFailure(Path path, FSInputStream in,
+ long start, long length, int crc) {
+ // TODO: What to do here?
+ }
+
+ @Override
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {
+ FileUtil.copy(localFs, src, this, dst, true, getConf());
+ }
+
+ @Override
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {
+ FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+ }
+
+ @Override
+ public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException {
+ FileUtil.copy(this, src, localFs, dst, false, copyCrc, getConf());
+ }
+
+ @Override
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return tmpLocalFile;
+ }
+
+ @Override
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+
+ // diagnostic methods
+
+ void dump() throws IOException {
+ store.dump();
+ }
+
+ void purge() throws IOException {
+ store.purge();
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,176 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+
+class S3InputStream extends FSInputStream {
+
+ private int bufferSize;
+
+ private FileSystemStore store;
+
+ private Block[] blocks;
+
+ private boolean closed;
+
+ private long fileLength;
+
+ private long pos = 0;
+
+ private DataInputStream blockStream;
+
+ private long blockEnd = -1;
+
+ public S3InputStream(Configuration conf, FileSystemStore store,
+ INode inode) {
+
+ this.store = store;
+ this.blocks = inode.getBlocks();
+ for (Block block : blocks) {
+ this.fileLength += block.getLength();
+ }
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ return (int) (fileLength - pos);
+ }
+
+ @Override
+ public synchronized void seek(long targetPos) throws IOException {
+ if (targetPos > fileLength) {
+ throw new IOException("Cannot seek after EOF");
+ }
+ pos = targetPos;
+ blockEnd = -1;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ int result = -1;
+ if (pos < fileLength) {
+ if (pos > blockEnd) {
+ blockSeekTo(pos);
+ }
+ result = blockStream.read();
+ if (result >= 0) {
+ pos++;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if (pos < fileLength) {
+ if (pos > blockEnd) {
+ blockSeekTo(pos);
+ }
+ int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+ int result = blockStream.read(buf, off, realLen);
+ if (result >= 0) {
+ pos += result;
+ }
+ return result;
+ }
+ return -1;
+ }
+
+ private synchronized void blockSeekTo(long target) throws IOException {
+ //
+ // Compute desired block
+ //
+ int targetBlock = -1;
+ long targetBlockStart = 0;
+ long targetBlockEnd = 0;
+ for (int i = 0; i < blocks.length; i++) {
+ long blockLength = blocks[i].getLength();
+ targetBlockEnd = targetBlockStart + blockLength - 1;
+
+ if (target >= targetBlockStart && target <= targetBlockEnd) {
+ targetBlock = i;
+ break;
+ } else {
+ targetBlockStart = targetBlockEnd + 1;
+ }
+ }
+ if (targetBlock < 0) {
+ throw new IOException(
+ "Impossible situation: could not find target position " + target);
+ }
+ long offsetIntoBlock = target - targetBlockStart;
+
+ // read block blocks[targetBlock] from position offsetIntoBlock
+
+ File fileBlock = File.createTempFile("s3fs-in", "");
+ fileBlock.deleteOnExit();
+ InputStream in = store.getBlockStream(blocks[targetBlock], offsetIntoBlock);
+ OutputStream out = new BufferedOutputStream(new FileOutputStream(fileBlock));
+ byte[] buf = new byte[bufferSize];
+ int numRead;
+ while ((numRead = in.read(buf)) >= 0) {
+ out.write(buf, 0, numRead);
+ }
+ out.close();
+ in.close();
+
+ this.pos = target;
+ this.blockEnd = targetBlockEnd;
+ this.blockStream = new DataInputStream(new FileInputStream(fileBlock));
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if (blockStream != null) {
+ blockStream.close();
+ blockStream.close();
+ blockStream = null;
+ }
+ super.close();
+ closed = true;
+ }
+
+ /**
+ * We don't support marks.
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ // Do nothing
+ }
+
+ @Override
+ public void reset() throws IOException {
+ throw new IOException("Mark not supported");
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,199 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+import org.apache.hadoop.util.Progressable;
+
+class S3OutputStream extends FSOutputStream {
+
+ private int bufferSize;
+
+ private FileSystemStore store;
+
+ private Path path;
+
+ private long blockSize;
+
+ private File backupFile;
+
+ private OutputStream backupStream;
+
+ private Random r = new Random();
+
+ private boolean closed;
+
+ private int pos = 0;
+
+ private long filePos = 0;
+
+ private int bytesWrittenToBlock = 0;
+
+ private byte[] outBuf;
+
+ private List<Block> blocks = new ArrayList<Block>();
+
+ private Block nextBlock;
+
+ public S3OutputStream(Configuration conf, FileSystemStore store,
+ Path path, long blockSize, Progressable progress) throws IOException {
+
+ this.store = store;
+ this.path = path;
+ this.blockSize = blockSize;
+ this.backupFile = newBackupFile();
+ this.backupStream = new FileOutputStream(backupFile);
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ this.outBuf = new byte[bufferSize];
+
+ }
+
+ private File newBackupFile() throws IOException {
+ File result = File.createTempFile("s3fs-out", "");
+ result.deleteOnExit();
+ return result;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return filePos;
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
+ flush();
+ }
+ outBuf[pos++] = (byte) b;
+ filePos++;
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ while (len > 0) {
+ int remaining = bufferSize - pos;
+ int toWrite = Math.min(remaining, len);
+ System.arraycopy(b, off, outBuf, pos, toWrite);
+ pos += toWrite;
+ off += toWrite;
+ len -= toWrite;
+ filePos += toWrite;
+
+ if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ if (bytesWrittenToBlock + pos >= blockSize) {
+ flushData((int) blockSize - bytesWrittenToBlock);
+ }
+ if (bytesWrittenToBlock == blockSize) {
+ endBlock();
+ }
+ flushData(pos);
+ }
+
+ private synchronized void flushData(int maxPos) throws IOException {
+ int workingPos = Math.min(pos, maxPos);
+
+ if (workingPos > 0) {
+ //
+ // To the local block backup, write just the bytes
+ //
+ backupStream.write(outBuf, 0, workingPos);
+
+ //
+ // Track position
+ //
+ bytesWrittenToBlock += workingPos;
+ System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+ pos -= workingPos;
+ }
+ }
+
+ private synchronized void endBlock() throws IOException {
+ //
+ // Done with local copy
+ //
+ backupStream.close();
+
+ //
+ // Send it to S3
+ //
+ // TODO: Use passed in Progressable to report progress.
+ nextBlockOutputStream();
+ InputStream in = new FileInputStream(backupFile);
+ store.storeBlock(nextBlock, in);
+ in.close();
+ internalClose();
+
+ //
+ // Delete local backup, start new one
+ //
+ backupFile.delete();
+ backupFile = newBackupFile();
+ backupStream = new FileOutputStream(backupFile);
+ bytesWrittenToBlock = 0;
+ }
+
+ private synchronized void nextBlockOutputStream() throws IOException {
+ long blockId = r.nextLong();
+ while (store.blockExists(blockId)) {
+ blockId = r.nextLong();
+ }
+ nextBlock = new Block(blockId, bytesWrittenToBlock);
+ blocks.add(nextBlock);
+ bytesWrittenToBlock = 0;
+ }
+
+ private synchronized void internalClose() throws IOException {
+ INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
+ .size()]));
+ store.storeINode(path, inode);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ flush();
+ if (filePos == 0 || bytesWrittenToBlock != 0) {
+ endBlock();
+ }
+
+ backupStream.close();
+ backupFile.delete();
+
+ super.close();
+
+ closed = true;
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html Wed Dec 13 15:12:07 2006
@@ -0,0 +1,34 @@
+<html>
+<body>
+
+<p>A distributed implementation of {@link
+org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
+
+<p>
+Files are stored in S3 as blocks (represented by {@link Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by {@link INode}) using the URL-encoded
+path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
+This design makes it easy to seek to any given position in a file by reading the inode data to compute
+which block to access, then using S3's support for
+<a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.2">HTTP Range</a> headers
+to start streaming from the correct position.
+Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since
+S3 does not support renames).
+</p>
+<p>
+For a single file <i>/dir1/file1</i> which takes two blocks of storage, the file structure in S3
+would be something like this:
+</p>
+<pre>
+/
+/dir1
+/dir1/file1
+block-6415776850131549260
+block-3026438247347758425
+</pre>
+<p>
+Inodes start with a leading <code>/</code>, while blocks are prefixed with <code>block-</code>.
+</p>
+
+</body>
+</html>
Modified: lucene/hadoop/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/hadoop-site.xml?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/hadoop-site.xml (original)
+++ lucene/hadoop/trunk/src/test/hadoop-site.xml Wed Dec 13 15:12:07 2006
@@ -14,5 +14,16 @@
<description>A base for other temporary directories.</description>
</property>
+<property>
+ <name>test.fs.s3.name</name>
+ <value>s3:///</value>
+ <description>The name of the s3 file system for testing.</description>
+</property>
+
+<property>
+ <name>fs.s3.block.size</name>
+ <value>128</value>
+ <description>Size of a block in bytes.</description>
+</property>
</configuration>
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,108 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+
+/**
+ * A stub implementation of {@link FileSystemStore} for testing
+ * {@link S3FileSystem} without actually connecting to S3.
+ * @author Tom White
+ */
+class InMemoryFileSystemStore implements FileSystemStore {
+
+ private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
+ private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
+
+ public void initialize(URI uri, Configuration conf) {
+ // Nothing to initialize
+ }
+
+ public void deleteINode(Path path) throws IOException {
+ inodes.remove(path);
+ }
+
+ public void deleteBlock(Block block) throws IOException {
+ blocks.remove(block.getId());
+ }
+
+ public boolean inodeExists(Path path) throws IOException {
+ return inodes.containsKey(path);
+ }
+
+ public boolean blockExists(long blockId) throws IOException {
+ return blocks.containsKey(blockId);
+ }
+
+ public INode getINode(Path path) throws IOException {
+ return inodes.get(path);
+ }
+
+ public InputStream getBlockStream(Block block, long byteRangeStart) throws IOException {
+ byte[] data = blocks.get(block.getId());
+ return new ByteArrayInputStream(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+ }
+
+ public Set<Path> listSubPaths(Path path) throws IOException {
+ // This is inefficient but more than adequate for testing purposes.
+ Set<Path> subPaths = new LinkedHashSet<Path>();
+ for (Path p : inodes.tailMap(path).keySet()) {
+ if (path.equals(p.getParent())) {
+ subPaths.add(p);
+ }
+ }
+ return subPaths;
+ }
+
+ public void storeINode(Path path, INode inode) throws IOException {
+ inodes.put(path, inode);
+ }
+
+ public void storeBlock(Block block, InputStream in) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buf = new byte[8192];
+ int numRead;
+ while ((numRead = in.read(buf)) >= 0) {
+ out.write(buf, 0, numRead);
+ }
+ blocks.put(block.getId(), out.toByteArray());
+ }
+
+ public void purge() throws IOException {
+ inodes.clear();
+ blocks.clear();
+ }
+
+ public void dump() throws IOException {
+ StringBuilder sb = new StringBuilder(getClass().getSimpleName());
+ sb.append(", \n");
+ for (Map.Entry<Path, INode> entry : inodes.entrySet()) {
+ sb.append(entry.getKey()).append("\n");
+ INode inode = entry.getValue();
+ sb.append("\t").append(inode.getFileType()).append("\n");
+ if (inode.getFileType() == FileType.DIRECTORY) {
+ continue;
+ }
+ for (int j = 0; j < inode.getBlocks().length; j++) {
+ sb.append("\t").append(inode.getBlocks()[j]).append("\n");
+ }
+ }
+ System.out.println(sb);
+
+ System.out.println(inodes.keySet());
+ System.out.println(blocks.keySet());
+ }
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,12 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+public class Jets3tS3FileSystemTest extends S3FileSystemBaseTest {
+
+ @Override
+ public FileSystemStore getFileSystemStore() throws IOException {
+ return new Jets3tFileSystemStore();
+ }
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,232 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.Path;
+
+public abstract class S3FileSystemBaseTest extends TestCase {
+
+ private static final int BLOCK_SIZE = 128;
+
+ private S3FileSystem s3FileSystem;
+
+ private byte[] data;
+
+ abstract FileSystemStore getFileSystemStore() throws IOException;
+
+ @Override
+ protected void setUp() throws IOException {
+ Configuration conf = new Configuration();
+
+ s3FileSystem = new S3FileSystem(getFileSystemStore());
+ s3FileSystem.initialize(URI.create(conf.get("test.fs.s3.name")), conf);
+
+ data = new byte[BLOCK_SIZE * 2];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) (i % 10);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ s3FileSystem.purge();
+ s3FileSystem.close();
+ }
+
+ public void testWorkingDirectory() throws Exception {
+
+ Path homeDir = new Path("/user/", System.getProperty("user.name"));
+ assertEquals(homeDir, s3FileSystem.getWorkingDirectory());
+
+ s3FileSystem.setWorkingDirectory(new Path("."));
+ assertEquals(homeDir, s3FileSystem.getWorkingDirectory());
+
+ s3FileSystem.setWorkingDirectory(new Path(".."));
+ assertEquals(new Path("/user/"), s3FileSystem.getWorkingDirectory());
+
+ s3FileSystem.setWorkingDirectory(new Path("hadoop"));
+ assertEquals(new Path("/user/hadoop"), s3FileSystem.getWorkingDirectory());
+
+ s3FileSystem.setWorkingDirectory(new Path("/test/hadoop"));
+ assertEquals(new Path("/test/hadoop"), s3FileSystem.getWorkingDirectory());
+
+ }
+
+ public void testMkdirs() throws Exception {
+ Path testDir = new Path("/test/hadoop");
+ assertFalse(s3FileSystem.exists(testDir));
+ assertFalse(s3FileSystem.isDirectory(testDir));
+ assertFalse(s3FileSystem.isFile(testDir));
+
+ assertTrue(s3FileSystem.mkdirs(testDir));
+
+ assertTrue(s3FileSystem.exists(testDir));
+ assertTrue(s3FileSystem.isDirectory(testDir));
+ assertFalse(s3FileSystem.isFile(testDir));
+
+ Path parentDir = testDir.getParent();
+ assertTrue(s3FileSystem.exists(parentDir));
+ assertTrue(s3FileSystem.isDirectory(parentDir));
+ assertFalse(s3FileSystem.isFile(parentDir));
+
+ Path grandparentDir = parentDir.getParent();
+ assertTrue(s3FileSystem.exists(grandparentDir));
+ assertTrue(s3FileSystem.isDirectory(grandparentDir));
+ assertFalse(s3FileSystem.isFile(grandparentDir));
+ }
+
+ public void testListPathsRaw() throws Exception {
+ Path[] testDirs = { new Path("/test/hadoop/a"), new Path("/test/hadoop/b"),
+ new Path("/test/hadoop/c/1"), };
+ assertNull(s3FileSystem.listPathsRaw(testDirs[0]));
+
+ for (Path path : testDirs) {
+ assertTrue(s3FileSystem.mkdirs(path));
+ }
+
+ Path[] paths = s3FileSystem.listPathsRaw(new Path("/"));
+
+ assertEquals(1, paths.length);
+ assertEquals(new Path("/test"), paths[0]);
+
+ paths = s3FileSystem.listPathsRaw(new Path("/test"));
+ assertEquals(1, paths.length);
+ assertEquals(new Path("/test/hadoop"), paths[0]);
+
+ paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop"));
+ assertEquals(3, paths.length);
+ assertEquals(new Path("/test/hadoop/a"), paths[0]);
+ assertEquals(new Path("/test/hadoop/b"), paths[1]);
+ assertEquals(new Path("/test/hadoop/c"), paths[2]);
+
+ paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop/a"));
+ assertEquals(0, paths.length);
+ }
+
+ public void testWriteReadAndDeleteEmptyFile() throws Exception {
+ writeReadAndDelete(0);
+ }
+
+ public void testWriteReadAndDeleteHalfABlock() throws Exception {
+ writeReadAndDelete(BLOCK_SIZE / 2);
+ }
+
+ public void testWriteReadAndDeleteOneBlock() throws Exception {
+ writeReadAndDelete(BLOCK_SIZE);
+ }
+
+ public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+ writeReadAndDelete(BLOCK_SIZE + BLOCK_SIZE / 2);
+ }
+
+ public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+ writeReadAndDelete(BLOCK_SIZE * 2);
+ }
+
+
+ private void writeReadAndDelete(int len) throws IOException {
+ Path path = new Path("/test/hadoop/file");
+
+ s3FileSystem.mkdirs(path.getParent());
+
+ FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+ out.write(data, 0, len);
+ out.close();
+
+ assertTrue("Exists", s3FileSystem.exists(path));
+
+ assertEquals("Block size", Math.min(len, BLOCK_SIZE), s3FileSystem.getBlockSize(path));
+
+ assertEquals("Length", len, s3FileSystem.getLength(path));
+
+ FSInputStream in = s3FileSystem.openRaw(path);
+ byte[] buf = new byte[len];
+
+ in.readFully(0, buf);
+
+ assertEquals(len, buf.length);
+ for (int i = 0; i < buf.length; i++) {
+ assertEquals("Position " + i, data[i], buf[i]);
+ }
+
+ assertTrue("Deleted", s3FileSystem.deleteRaw(path));
+
+ assertFalse("No longer exists", s3FileSystem.exists(path));
+
+ }
+
+ public void testOverwrite() throws IOException {
+ Path path = new Path("/test/hadoop/file");
+
+ s3FileSystem.mkdirs(path.getParent());
+
+ FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+ out.write(data, 0, BLOCK_SIZE);
+ out.close();
+
+ assertTrue("Exists", s3FileSystem.exists(path));
+ assertEquals("Length", BLOCK_SIZE, s3FileSystem.getLength(path));
+
+ try {
+ s3FileSystem.createRaw(path, false, (short) 1, 128);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ out = s3FileSystem.createRaw(path, true, (short) 1, BLOCK_SIZE);
+ out.write(data, 0, BLOCK_SIZE / 2);
+ out.close();
+
+ assertTrue("Exists", s3FileSystem.exists(path));
+ assertEquals("Length", BLOCK_SIZE / 2, s3FileSystem.getLength(path));
+
+ }
+
+ public void testWriteInNonExistentDirectory() {
+ Path path = new Path("/test/hadoop/file");
+ try {
+ s3FileSystem.createRaw(path, false, (short) 1, 128);
+ fail("Should throw IOException.");
+ } catch (IOException e) {
+ // Expected
+ }
+ }
+
+ public void testRename() throws Exception {
+ int len = BLOCK_SIZE;
+
+ Path path = new Path("/test/hadoop/file");
+
+ s3FileSystem.mkdirs(path.getParent());
+
+ FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+ out.write(data, 0, len);
+ out.close();
+
+ assertTrue("Exists", s3FileSystem.exists(path));
+
+ Path newPath = new Path("/test/hadoop/newfile");
+ s3FileSystem.rename(path, newPath);
+ assertFalse("No longer exists", s3FileSystem.exists(path));
+ assertTrue("Moved", s3FileSystem.exists(newPath));
+
+ FSInputStream in = s3FileSystem.openRaw(newPath);
+ byte[] buf = new byte[len];
+
+ in.readFully(0, buf);
+
+ assertEquals(len, buf.length);
+ for (int i = 0; i < buf.length; i++) {
+ assertEquals("Position " + i, data[i], buf[i]);
+ }
+ }
+
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,42 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.s3.INode.FileType;
+
+public class TestINode extends TestCase {
+
+ public void testSerializeFileWithSingleBlock() throws IOException {
+ Block[] blocks = { new Block(849282477840258181L, 128L) };
+ INode inode = new INode(FileType.FILE, blocks);
+
+ assertEquals("Length", 1L + 4 + 16, inode.getSerializedLength());
+ InputStream in = inode.serialize();
+
+ INode deserialized = INode.deserialize(in);
+
+ assertEquals("FileType", inode.getFileType(), deserialized.getFileType());
+ Block[] deserializedBlocks = deserialized.getBlocks();
+ assertEquals("Length", 1, deserializedBlocks.length);
+ assertEquals("Id", blocks[0].getId(), deserializedBlocks[0].getId());
+ assertEquals("Length", blocks[0].getLength(), deserializedBlocks[0]
+ .getLength());
+
+ }
+
+ public void testSerializeDirectory() throws IOException {
+ INode inode = INode.DIRECTORY_INODE;
+ assertEquals("Length", 1L, inode.getSerializedLength());
+ InputStream in = inode.serialize();
+ INode deserialized = INode.deserialize(in);
+ assertSame(INode.DIRECTORY_INODE, deserialized);
+ }
+
+ public void testDeserializeNull() throws IOException {
+ assertNull(INode.deserialize(null));
+ }
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,33 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestInMemoryS3FileSystem extends S3FileSystemBaseTest {
+
+ @Override
+ public FileSystemStore getFileSystemStore() throws IOException {
+ return new InMemoryFileSystemStore();
+ }
+
+ public void testInitialization() throws IOException {
+ initializationTest("s3://a:b@c", "s3://a:b@c");
+ initializationTest("s3://a:b@c/", "s3://a:b@c");
+ initializationTest("s3://a:b@c/path", "s3://a:b@c");
+ initializationTest("s3://a@c", "s3://a@c");
+ initializationTest("s3://a@c/", "s3://a@c");
+ initializationTest("s3://a@c/path", "s3://a@c");
+ initializationTest("s3://c", "s3://c");
+ initializationTest("s3://c/", "s3://c");
+ initializationTest("s3://c/path", "s3://c");
+ }
+
+ private void initializationTest(String initializationUri, String expectedUri) throws IOException {
+ S3FileSystem fs = new S3FileSystem(getFileSystemStore());
+ fs.initialize(URI.create(initializationUri), new Configuration());
+ assertEquals(URI.create(expectedUri), fs.getUri());
+ }
+
+}