You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/14 00:12:09 UTC

svn commit: r486886 - in /lucene/hadoop/trunk: ./ conf/ lib/ src/java/org/apache/hadoop/fs/s3/ src/test/ src/test/org/apache/hadoop/fs/s3/

Author: cutting
Date: Wed Dec 13 15:12:07 2006
New Revision: 486886

URL: http://svn.apache.org/viewvc?view=rev&rev=486886
Log:
HADOOP-574.  Add an Amazon S3 FileSystem implementation.  Contributed by Tom White.

Added:
    lucene/hadoop/trunk/lib/commons-codec-1.3.jar   (with props)
    lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar   (with props)
    lucene/hadoop/trunk/lib/jets3t.jar   (with props)
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/test/hadoop-site.xml

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 13 15:12:07 2006
@@ -92,6 +92,13 @@
 26. HADOOP-454.  Add a 'dfs -dus' command that provides summary disk
     usage.  (Hairong Kuang via cutting)
 
+27. HADOOP-574.  Add an Amazon S3 implementation of FileSystem.  To
+    use this, one need only specify paths of the form
+    s3://id:secret@bucket/.  Alternately, the AWS access key id and
+    secret can be specified in your config, with the properties
+    fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey.
+    (Tom White via cutting)
+
 
 Release 0.9.1 - 2006-12-06
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Dec 13 15:12:07 2006
@@ -119,6 +119,12 @@
 </property>
 
 <property>
+  <name>fs.s3.impl</name>
+  <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
+  <description>The FileSystem for s3: uris.</description>
+</property>
+
+<property>
   <name>dfs.datanode.bindAddress</name>
   <value>0.0.0.0</value>
   <description>

Added: lucene/hadoop/trunk/lib/commons-codec-1.3.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-codec-1.3.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/lib/commons-codec-1.3.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/lib/commons-httpclient-3.0.1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/hadoop/trunk/lib/jets3t.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/jets3t.jar?view=auto&rev=486886
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/lib/jets3t.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Block.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,26 @@
+package org.apache.hadoop.fs.s3;
+
+class Block {
+  private long id;
+
+  private long length;
+
+  public Block(long id, long length) {
+    this.id = id;
+    this.length = length;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  @Override
+  public String toString() {
+    return "Block[" + id + ", " + length + "]";
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,40 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+interface FileSystemStore {
+  
+  void initialize(URI uri, Configuration conf) throws IOException;
+
+  void storeINode(Path path, INode inode) throws IOException;
+  void storeBlock(Block block, InputStream in) throws IOException;
+  
+  boolean inodeExists(Path path) throws IOException;
+  boolean blockExists(long blockId) throws IOException;
+
+  INode getINode(Path path) throws IOException;
+  InputStream getBlockStream(Block block, long byteRangeStart) throws IOException;
+
+  void deleteINode(Path path) throws IOException;
+  void deleteBlock(Block block) throws IOException;
+
+  Set<Path> listSubPaths(Path path) throws IOException;
+
+  /**
+   * Delete everything. Used for testing.
+   * @throws IOException
+   */
+  void purge() throws IOException;
+  
+  /**
+   * Diagnostic method to dump all INodes to the console.
+   * @throws IOException
+   */
+  void dump() throws IOException;
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/INode.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,99 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Holds file metadata including type (regular file, or directory),
+ * and the list of blocks that are pointers to the data.
+ */
+class INode {
+	
+  enum FileType {
+    DIRECTORY, FILE
+  }
+  
+  public static final FileType[] FILE_TYPES = {
+    FileType.DIRECTORY,
+    FileType.FILE
+  };
+
+  public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
+  
+  private FileType fileType;
+  private Block[] blocks;
+
+  public INode(FileType fileType, Block[] blocks) {
+    this.fileType = fileType;
+    if (isDirectory() && blocks != null) {
+      throw new IllegalArgumentException("A directory cannot contain blocks.");
+    }
+    this.blocks = blocks;
+  }
+
+  public Block[] getBlocks() {
+    return blocks;
+  }
+  
+  public FileType getFileType() {
+    return fileType;
+  }
+
+  public boolean isDirectory() {
+    return fileType == FileType.DIRECTORY;
+  }  
+
+  public boolean isFile() {
+    return fileType == FileType.FILE;
+  }
+  
+  public long getSerializedLength() {
+    return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
+  }
+  
+
+  public InputStream serialize() throws IOException {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bytes);
+    out.writeByte(fileType.ordinal());
+    if (isFile()) {
+      out.writeInt(blocks.length);
+      for (int i = 0; i < blocks.length; i++) {
+        out.writeLong(blocks[i].getId());
+        out.writeLong(blocks[i].getLength());
+      }
+    }
+    out.close();
+    return new ByteArrayInputStream(bytes.toByteArray());
+  }
+  
+  public static INode deserialize(InputStream in) throws IOException {
+    if (in == null) {
+      return null;
+    }
+    DataInputStream dataIn = new DataInputStream(in);
+    FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
+    switch (fileType) {
+    case DIRECTORY:
+      in.close();
+      return INode.DIRECTORY_INODE;
+    case FILE:
+      int numBlocks = dataIn.readInt();
+      Block[] blocks = new Block[numBlocks];
+      for (int i = 0; i < numBlocks; i++) {
+        long id = dataIn.readLong();
+        long length = dataIn.readLong();
+        blocks[i] = new Block(id, length);
+      }
+      in.close();
+      return new INode(fileType, blocks);
+    default:
+      throw new IllegalArgumentException("Cannot deserialize inode.");
+    }    
+  }  
+  
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,296 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.security.AWSCredentials;
+
+class Jets3tFileSystemStore implements FileSystemStore {
+
+  private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR);
+  private static final String BLOCK_PREFIX = "block_";
+
+  private S3Service s3Service;
+
+  private S3Bucket bucket;
+
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    try {
+      String accessKey = null;
+      String secretAccessKey = null;
+      String userInfo = uri.getUserInfo();
+      if (userInfo != null) {
+          int index = userInfo.indexOf(':');
+          if (index != -1) {
+	          accessKey = userInfo.substring(0, index);
+	          secretAccessKey = userInfo.substring(index + 1);
+          } else {
+        	  accessKey = userInfo;
+          }
+      }
+      if (accessKey == null) {
+    	  accessKey = conf.get("fs.s3.awsAccessKeyId");
+      }
+      if (secretAccessKey == null) {
+    	  secretAccessKey = conf.get("fs.s3.awsSecretAccessKey");
+      }
+      if (accessKey == null && secretAccessKey == null) {
+    	  throw new IllegalArgumentException("AWS " +
+    	  		"Access Key ID and Secret Access Key " +
+    	  		"must be specified as the username " +
+    	  		"or password (respectively) of a s3 URL, " +
+    	  		"or by setting the " +
+	  		    "fs.s3.awsAccessKeyId or " +    	  		
+    	  		"fs.s3.awsSecretAccessKey properties (respectively).");
+      } else if (accessKey == null) {
+    	  throw new IllegalArgumentException("AWS " +
+      	  		"Access Key ID must be specified " +
+      	  		"as the username of a s3 URL, or by setting the " +
+      	  		"fs.s3.awsAccessKeyId property.");
+      } else if (secretAccessKey == null) {
+    	  throw new IllegalArgumentException("AWS " +
+    	  		"Secret Access Key must be specified " +
+    	  		"as the password of a s3 URL, or by setting the " +
+    	  		"fs.s3.awsSecretAccessKey property.");    	  
+      }
+      AWSCredentials awsCredentials = new AWSCredentials(accessKey, secretAccessKey);
+      this.s3Service = new RestS3Service(awsCredentials);
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+    bucket = new S3Bucket(uri.getHost());
+
+    createBucket(bucket.getName());
+  }  
+
+  private void createBucket(String bucketName) throws IOException {
+    try {
+      s3Service.createBucket(bucketName);
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  private void delete(String key) throws IOException {
+    try {
+      s3Service.deleteObject(bucket, key);
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  public void deleteINode(Path path) throws IOException {
+    delete(pathToKey(path));
+  }
+
+  public void deleteBlock(Block block) throws IOException {
+    delete(blockToKey(block));
+  }
+
+  public boolean inodeExists(Path path) throws IOException {
+    InputStream in = get(pathToKey(path));
+    if (in == null) {
+      return false;
+    }
+    in.close();
+    return true;
+  }
+  
+  public boolean blockExists(long blockId) throws IOException {
+    InputStream in = get(blockToKey(blockId));
+    if (in == null) {
+      return false;
+    }
+    in.close();
+    return true;
+  }
+
+  private InputStream get(String key) throws IOException {
+    try {
+      S3Object object = s3Service.getObject(bucket, key);
+      return object.getDataInputStream();
+    } catch (S3ServiceException e) {
+      if (e.getErrorCode().equals("NoSuchKey")) {
+        return null;
+      }
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  private InputStream get(String key, long byteRangeStart) throws IOException {
+    try {
+      S3Object object = s3Service.getObject(bucket, key, null, null, null,
+          null, byteRangeStart, null);
+      return object.getDataInputStream();
+    } catch (S3ServiceException e) {
+      if (e.getErrorCode().equals("NoSuchKey")) {
+        return null;
+      }
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  public INode getINode(Path path) throws IOException {
+    return INode.deserialize(get(pathToKey(path)));
+  }
+
+  public InputStream getBlockStream(Block block, long byteRangeStart)
+      throws IOException {
+    return get(blockToKey(block), byteRangeStart);
+  }
+
+  public Set<Path> listSubPaths(Path path) throws IOException {
+    try {
+      String prefix = pathToKey(path);
+      if (!prefix.endsWith(PATH_DELIMITER)) {
+        prefix += PATH_DELIMITER;
+      }
+      S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER, 0);
+      Set<Path> prefixes = new TreeSet<Path>();
+      for (int i = 0; i < objects.length; i++) {
+        prefixes.add(keyToPath(objects[i].getKey()));
+      }
+      prefixes.remove(path);
+      return prefixes;
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  private void put(String key, InputStream in, long length) throws IOException {
+    try {
+      S3Object object = new S3Object(key);
+      object.setDataInputStream(in);
+      object.setContentType("binary/octet-stream");
+      object.setContentLength(length);
+      s3Service.putObject(bucket, object);
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  public void storeINode(Path path, INode inode) throws IOException {
+    put(pathToKey(path), inode.serialize(), inode.getSerializedLength());
+  }
+
+  public void storeBlock(Block block, InputStream in) throws IOException {
+    put(blockToKey(block), in, block.getLength());
+  }
+
+  private String pathToKey(Path path) {
+    if (!path.isAbsolute()) {
+      throw new IllegalArgumentException("Path must be absolute: " + path);
+    }
+    return urlEncode(path.toString());
+  }
+
+  private Path keyToPath(String key) {
+    return new Path(urlDecode(key));
+  }
+  
+  private static String urlEncode(String s) {
+    try {
+      return URLEncoder.encode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      // Should never happen since every implementation of the Java Platform
+      // is required to support UTF-8.
+      // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+      throw new IllegalStateException(e);
+    }
+  }
+  
+  private static String urlDecode(String s) {
+    try {
+      return URLDecoder.decode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      // Should never happen since every implementation of the Java Platform
+      // is required to support UTF-8.
+      // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private String blockToKey(long blockId) {
+    return BLOCK_PREFIX + blockId;
+  }
+
+  private String blockToKey(Block block) {
+    return blockToKey(block.getId());
+  }
+
+  public void purge() throws IOException {
+    try {
+      S3Object[] objects = s3Service.listObjects(bucket);
+      for (int i = 0; i < objects.length; i++) {
+        s3Service.deleteObject(bucket, objects[i].getKey());
+      }
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+  }
+
+  public void dump() throws IOException {
+    StringBuilder sb = new StringBuilder("S3 Filesystem, ");
+    sb.append(bucket.getName()).append("\n");
+    try {
+      S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
+      for (int i = 0; i < objects.length; i++) {
+        Path path = keyToPath(objects[i].getKey());
+        sb.append(path).append("\n");
+        INode m = getINode(path);
+        sb.append("\t").append(m.getFileType()).append("\n");
+        if (m.getFileType() == FileType.DIRECTORY) {
+          continue;
+        }
+        for (int j = 0; j < m.getBlocks().length; j++) {
+          sb.append("\t").append(m.getBlocks()[j]).append("\n");
+        }
+      }
+    } catch (S3ServiceException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      }
+      throw new S3Exception(e);
+    }
+    System.out.println(sb);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3Exception.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,12 @@
+package org.apache.hadoop.fs.s3;
+
+/**
+ * Thrown if there is a problem communicating with Amazon S3.
+ */
+public class S3Exception extends RuntimeException {
+
+  public S3Exception(Throwable t) {
+    super(t);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,320 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://aws.amazon.com/s3">Amazon S3</a>.
+ * </p>
+ * @author Tom White
+ */
+public class S3FileSystem extends FileSystem {
+
+  private static final long DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024;
+  
+  private URI uri;
+
+  private FileSystemStore store;
+
+  private FileSystem localFs;
+
+  private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+  public S3FileSystem() {
+    this(new Jets3tFileSystemStore());
+  }
+
+  public S3FileSystem(FileSystemStore store) {
+    this.store = store;
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
+    this.localFs = get(URI.create("file:///"), conf);
+  }  
+
+  @Override
+  public String getName() {
+    return getUri().toString();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+    workingDir = makeAbsolute(dir);
+  }
+
+  private Path makeAbsolute(Path path) {
+    if (path.isAbsolute()) {
+      return path;
+    }
+    return new Path(workingDir, path);
+  }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+    return store.inodeExists(makeAbsolute(path));
+  }
+
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.getINode(absolutePath);
+    if (inode == null) {
+      store.storeINode(path, INode.DIRECTORY_INODE);
+    } else if (inode.isFile()) {
+      throw new IOException(String.format(
+          "Can't make directory for path %s since it is a file.", path));
+    }
+    Path parent = path.getParent();
+    return (parent == null || mkdirs(parent));
+  }
+
+  @Override
+  public boolean isDirectory(Path path) throws IOException {
+    INode inode = store.getINode(makeAbsolute(path));
+    if (inode == null) {
+      return false;
+    }
+    return inode.isDirectory();
+  }
+
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    INode inode = store.getINode(makeAbsolute(path));
+    if (inode == null) {
+      return false;
+    }
+    return inode.isFile();
+  }
+
+  private INode checkFile(Path path) throws IOException {
+    INode inode = store.getINode(makeAbsolute(path));
+    if (inode == null) {
+      throw new IOException("No such file.");
+    }
+    if (inode.isDirectory()) {
+      throw new IOException("Path " + path + " is a directory.");
+    }
+    return inode;
+  }
+
+  @Override
+  public Path[] listPathsRaw(Path path) throws IOException {
+    INode inode = store.getINode(makeAbsolute(path));
+    if (inode == null) {
+      return null;
+    } else if (inode.isFile()) {
+      return new Path[] { path };
+    } else { // directory
+      Set<Path> paths = store.listSubPaths(path);
+      return paths.toArray(new Path[0]);
+    }
+  }
+
+  @Override
+  public FSOutputStream createRaw(Path file, boolean overwrite,
+      short replication, long blockSize) throws IOException {
+
+    return createRaw(file, overwrite, replication, blockSize, null);
+  }
+
+  @Override
+  public FSOutputStream createRaw(Path file, boolean overwrite,
+      short replication, long blockSize, Progressable progress)
+      throws IOException {
+
+    if (!isDirectory(file.getParent())) {
+      throw new IOException("Cannot create file " + file
+          + " since parent directory does not exist.");
+    }
+    INode inode = store.getINode(makeAbsolute(file));
+    if (inode != null) {
+      if (overwrite) {
+        deleteRaw(file);
+      } else {
+        throw new IOException("File already exists: " + file);
+      }
+    }
+    return new S3OutputStream(getConf(), store, makeAbsolute(file),
+        blockSize, progress);
+  }
+
+  @Override
+  public FSInputStream openRaw(Path path) throws IOException {
+    INode inode = checkFile(path);
+    return new S3InputStream(getConf(), store, inode);
+  }
+
+  @Override
+  public boolean renameRaw(Path src, Path dst) throws IOException {
+    // TODO: Check corner cases: dst already exists,
+    // or if path is directory with children
+    Path absoluteSrc = makeAbsolute(src);
+    INode inode = store.getINode(absoluteSrc);
+    if (inode == null) {
+      throw new IOException("No such file.");
+    }
+    store.storeINode(makeAbsolute(dst), inode);
+    store.deleteINode(absoluteSrc);
+    return true;
+  }
+
+  @Override
+  public boolean deleteRaw(Path path) throws IOException {
+    // TODO: Check if path is directory with children
+    Path absolutePath = makeAbsolute(path);
+    INode inode = store.getINode(absolutePath);
+    if (inode == null) {
+      throw new IOException("No such file or directory.");
+    }
+    store.deleteINode(absolutePath);
+    if (inode.isFile()) {
+      for (Block block : inode.getBlocks()) {
+        store.deleteBlock(block);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public long getLength(Path path) throws IOException {
+    INode inode = checkFile(path);
+    long length = 0;
+    for (Block block : inode.getBlocks()) {
+      length += block.getLength();
+    }
+    return length;
+  }
+
+  /**
+   * Replication is not supported for S3 file systems since S3 handles it for
+   * us.
+   */
+  @Override
+  public short getReplication(Path path) throws IOException {
+    return 1;
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return 1;
+  }
+
+  /**
+   * Replication is not supported for S3 file systems since S3 handles it for
+   * us.
+   */
+  @Override
+  public boolean setReplicationRaw(Path path, short replication)
+      throws IOException {
+    return true;
+  }
+
+  @Override
+  public long getBlockSize(Path path) throws IOException {
+    INode inode = store.getINode(makeAbsolute(path));
+    if (inode == null) {
+      throw new IOException("No such file or directory.");
+    }
+    Block[] blocks = inode.getBlocks();
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+    return blocks[0].getLength();
+  }
+
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3.block.size", DEFAULT_BLOCK_SIZE);
+  }
+
+  /**
+   * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+   */
+  @Override
+  public String[][] getFileCacheHints(Path f, long start, long len)
+      throws IOException {
+    // TODO: Check this is the correct behavior
+    if (!exists(f)) {
+      return null;
+    }
+    return new String[][] { { "localhost" } };
+  }
+
+  @Override
+  public void lock(Path path, boolean shared) throws IOException {
+    // TODO: Design and implement
+  }
+
+  @Override
+  public void release(Path path) throws IOException {
+    // TODO: Design and implement
+  }
+
+  @Override
+  public void reportChecksumFailure(Path path, FSInputStream in,
+      long start, long length, int crc) {
+    // TODO: What to do here?
+  }
+
+  @Override
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {
+    FileUtil.copy(localFs, src, this, dst, true, getConf());
+  }
+
+  @Override
+  public void copyFromLocalFile(Path src, Path dst) throws IOException {
+    FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+  }
+
+  @Override
+  public void copyToLocalFile(Path src, Path dst, boolean copyCrc) throws IOException {
+    FileUtil.copy(this, src, localFs, dst, false, copyCrc, getConf());
+  }
+
+  @Override
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    return tmpLocalFile;
+  }
+
+  @Override
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    moveFromLocalFile(tmpLocalFile, fsOutputFile);
+  }
+
+  // diagnostic methods
+
+  void dump() throws IOException {
+    store.dump();
+  }
+
+  void purge() throws IOException {
+    store.purge();
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,176 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+
+class S3InputStream extends FSInputStream {
+
+  private int bufferSize;
+
+  private FileSystemStore store;
+
+  private Block[] blocks;
+
+  private boolean closed;
+
+  private long fileLength;
+
+  private long pos = 0;
+
+  private DataInputStream blockStream;
+
+  private long blockEnd = -1;
+
+  public S3InputStream(Configuration conf, FileSystemStore store,
+      INode inode) {
+    
+    this.store = store;
+    this.blocks = inode.getBlocks();
+    for (Block block : blocks) {
+      this.fileLength += block.getLength();
+    }
+    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);    
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    return (int) (fileLength - pos);
+  }
+
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > fileLength) {
+      throw new IOException("Cannot seek after EOF");
+    }
+    pos = targetPos;
+    blockEnd = -1;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    int result = -1;
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      result = blockStream.read();
+      if (result >= 0) {
+        pos++;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+      int result = blockStream.read(buf, off, realLen);
+      if (result >= 0) {
+        pos += result;
+      }
+      return result;
+    }
+    return -1;
+  }
+
+  private synchronized void blockSeekTo(long target) throws IOException {
+    //
+    // Compute desired block
+    //
+    int targetBlock = -1;
+    long targetBlockStart = 0;
+    long targetBlockEnd = 0;
+    for (int i = 0; i < blocks.length; i++) {
+      long blockLength = blocks[i].getLength();
+      targetBlockEnd = targetBlockStart + blockLength - 1;
+
+      if (target >= targetBlockStart && target <= targetBlockEnd) {
+        targetBlock = i;
+        break;
+      } else {
+        targetBlockStart = targetBlockEnd + 1;
+      }
+    }
+    if (targetBlock < 0) {
+      throw new IOException(
+          "Impossible situation: could not find target position " + target);
+    }
+    long offsetIntoBlock = target - targetBlockStart;
+
+    // read block blocks[targetBlock] from position offsetIntoBlock
+
+    File fileBlock = File.createTempFile("s3fs-in", "");
+    fileBlock.deleteOnExit();
+    InputStream in = store.getBlockStream(blocks[targetBlock], offsetIntoBlock);
+    OutputStream out = new BufferedOutputStream(new FileOutputStream(fileBlock));
+    byte[] buf = new byte[bufferSize];
+    int numRead;
+    while ((numRead = in.read(buf)) >= 0) {
+      out.write(buf, 0, numRead);
+    }
+    out.close();
+    in.close();
+
+    this.pos = target;
+    this.blockEnd = targetBlockEnd;
+    this.blockStream = new DataInputStream(new FileInputStream(fileBlock));
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (blockStream != null) {
+      blockStream.close();
+      blockStream.close();
+      blockStream = null;
+    }
+    super.close();
+    closed = true;
+  }
+
+  /**
+   * We don't support marks.
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readLimit) {
+    // Do nothing
+  }
+
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark not supported");
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3OutputStream.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,199 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+import org.apache.hadoop.util.Progressable;
+
+class S3OutputStream extends FSOutputStream {
+
+  private int bufferSize;
+
+  private FileSystemStore store;
+
+  private Path path;
+
+  private long blockSize;
+
+  private File backupFile;
+
+  private OutputStream backupStream;
+
+  private Random r = new Random();
+
+  private boolean closed;
+
+  private int pos = 0;
+
+  private long filePos = 0;
+
+  private int bytesWrittenToBlock = 0;
+
+  private byte[] outBuf;
+
+  private List<Block> blocks = new ArrayList<Block>();
+
+  private Block nextBlock;
+
+  public S3OutputStream(Configuration conf, FileSystemStore store,
+      Path path, long blockSize, Progressable progress) throws IOException {
+    
+    this.store = store;
+    this.path = path;
+    this.blockSize = blockSize;
+    this.backupFile = newBackupFile();
+    this.backupStream = new FileOutputStream(backupFile);
+    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    this.outBuf = new byte[bufferSize];
+
+  }
+
+  private File newBackupFile() throws IOException {
+    File result = File.createTempFile("s3fs-out", "");
+    result.deleteOnExit();
+    return result;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return filePos;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
+      flush();
+    }
+    outBuf[pos++] = (byte) b;
+    filePos++;
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    while (len > 0) {
+      int remaining = bufferSize - pos;
+      int toWrite = Math.min(remaining, len);
+      System.arraycopy(b, off, outBuf, pos, toWrite);
+      pos += toWrite;
+      off += toWrite;
+      len -= toWrite;
+      filePos += toWrite;
+
+      if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
+        flush();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    if (bytesWrittenToBlock + pos >= blockSize) {
+      flushData((int) blockSize - bytesWrittenToBlock);
+    }
+    if (bytesWrittenToBlock == blockSize) {
+      endBlock();
+    }
+    flushData(pos);
+  }
+
+  private synchronized void flushData(int maxPos) throws IOException {
+    int workingPos = Math.min(pos, maxPos);
+
+    if (workingPos > 0) {
+      //
+      // To the local block backup, write just the bytes
+      //
+      backupStream.write(outBuf, 0, workingPos);
+
+      //
+      // Track position
+      //
+      bytesWrittenToBlock += workingPos;
+      System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+      pos -= workingPos;
+    }
+  }
+
+  private synchronized void endBlock() throws IOException {
+    //
+    // Done with local copy
+    //
+    backupStream.close();
+
+    //
+    // Send it to S3
+    //
+    // TODO: Use passed in Progressable to report progress.
+    nextBlockOutputStream();
+    InputStream in = new FileInputStream(backupFile);
+    store.storeBlock(nextBlock, in);
+    in.close();
+    internalClose();
+
+    //
+    // Delete local backup, start new one
+    //
+    backupFile.delete();
+    backupFile = newBackupFile();
+    backupStream = new FileOutputStream(backupFile);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void nextBlockOutputStream() throws IOException {
+    long blockId = r.nextLong();
+    while (store.blockExists(blockId)) {
+      blockId = r.nextLong();
+    }
+    nextBlock = new Block(blockId, bytesWrittenToBlock);
+    blocks.add(nextBlock);
+    bytesWrittenToBlock = 0;
+  }
+
+  private synchronized void internalClose() throws IOException {
+    INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
+        .size()]));
+    store.storeINode(path, inode);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    flush();
+    if (filePos == 0 || bytesWrittenToBlock != 0) {
+      endBlock();
+    }
+
+    backupStream.close();
+    backupFile.delete();
+
+    super.close();
+
+    closed = true;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html Wed Dec 13 15:12:07 2006
@@ -0,0 +1,34 @@
+<html>
+<body>
+
+<p>A distributed implementation of {@link
+org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
+
+<p>
+Files are stored in S3 as blocks (represented by {@link Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by {@link INode}) using the URL-encoded
+path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
+This design makes it easy to seek to any given position in a file by reading the inode data to compute
+which block to access, then using S3's support for 
+<a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.2">HTTP Range</a> headers
+to start streaming from the correct position.
+Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since 
+S3 does not support renames).
+</p>
+<p>
+For a single file <i>/dir1/file1</i> which takes two blocks of storage, the file structure in S3
+would be something like this:
+</p>
+<pre>
+/
+/dir1
+/dir1/file1
+block-6415776850131549260
+block-3026438247347758425
+</pre>
+<p>
+Inodes start with a leading <code>/</code>, while blocks are prefixed with <code>block-</code>.
+</p>
+
+</body>
+</html>

Modified: lucene/hadoop/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/hadoop-site.xml?view=diff&rev=486886&r1=486885&r2=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/hadoop-site.xml (original)
+++ lucene/hadoop/trunk/src/test/hadoop-site.xml Wed Dec 13 15:12:07 2006
@@ -14,5 +14,16 @@
   <description>A base for other temporary directories.</description>
 </property>
 
+<property>
+  <name>test.fs.s3.name</name>
+  <value>s3:///</value>
+  <description>The name of the s3 file system for testing.</description>
+</property>
+
+<property>
+  <name>fs.s3.block.size</name>
+  <value>128</value>
+  <description>Size of a block in bytes.</description>
+</property>
 
 </configuration>

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,108 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3.INode.FileType;
+
+/**
+ * A stub implementation of {@link FileSystemStore} for testing
+ * {@link S3FileSystem} without actually connecting to S3.
+ * @author Tom White
+ */
+class InMemoryFileSystemStore implements FileSystemStore {
+  
+  private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
+  private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
+  
+  public void initialize(URI uri, Configuration conf) {
+    // Nothing to initialize
+  }
+
+  public void deleteINode(Path path) throws IOException {
+    inodes.remove(path);
+  }
+
+  public void deleteBlock(Block block) throws IOException {
+    blocks.remove(block.getId());
+  }
+
+  public boolean inodeExists(Path path) throws IOException {
+    return inodes.containsKey(path);
+  }
+
+  public boolean blockExists(long blockId) throws IOException {
+    return blocks.containsKey(blockId);
+  }
+
+  public INode getINode(Path path) throws IOException {
+    return inodes.get(path);
+  }
+
+  public InputStream getBlockStream(Block block, long byteRangeStart) throws IOException {
+    byte[] data = blocks.get(block.getId());
+    return new ByteArrayInputStream(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+  }
+
+  public Set<Path> listSubPaths(Path path) throws IOException {
+    // This is inefficient but more than adequate for testing purposes.
+    Set<Path> subPaths = new LinkedHashSet<Path>();
+    for (Path p : inodes.tailMap(path).keySet()) {
+      if (path.equals(p.getParent())) {
+        subPaths.add(p);
+      }
+    }
+    return subPaths;
+  }
+
+  public void storeINode(Path path, INode inode) throws IOException {
+    inodes.put(path, inode);
+  }
+
+  public void storeBlock(Block block, InputStream in) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    byte[] buf = new byte[8192];
+    int numRead;
+    while ((numRead = in.read(buf)) >= 0) {
+      out.write(buf, 0, numRead);
+    }
+    blocks.put(block.getId(), out.toByteArray());
+  }
+
+  public void purge() throws IOException {
+    inodes.clear();
+    blocks.clear();
+  }
+
+  public void dump() throws IOException {
+    StringBuilder sb = new StringBuilder(getClass().getSimpleName());
+    sb.append(", \n");
+    for (Map.Entry<Path, INode> entry : inodes.entrySet()) {
+      sb.append(entry.getKey()).append("\n");
+      INode inode = entry.getValue();
+      sb.append("\t").append(inode.getFileType()).append("\n");
+      if (inode.getFileType() == FileType.DIRECTORY) {
+        continue;
+      }
+      for (int j = 0; j < inode.getBlocks().length; j++) {
+        sb.append("\t").append(inode.getBlocks()[j]).append("\n");
+      }      
+    }
+    System.out.println(sb);
+    
+    System.out.println(inodes.keySet());
+    System.out.println(blocks.keySet());
+  }
+
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,12 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+public class Jets3tS3FileSystemTest extends S3FileSystemBaseTest {
+
+  @Override
+  public FileSystemStore getFileSystemStore() throws IOException {
+    return new Jets3tFileSystemStore();
+  }
+
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/S3FileSystemBaseTest.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,232 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.Path;
+
+public abstract class S3FileSystemBaseTest extends TestCase {
+  
+  private static final int BLOCK_SIZE = 128;
+  
+  private S3FileSystem s3FileSystem;
+
+  private byte[] data;
+
+  abstract FileSystemStore getFileSystemStore() throws IOException;
+
+  @Override
+  protected void setUp() throws IOException {
+    Configuration conf = new Configuration();
+    
+    s3FileSystem = new S3FileSystem(getFileSystemStore());
+    s3FileSystem.initialize(URI.create(conf.get("test.fs.s3.name")), conf);
+    
+    data = new byte[BLOCK_SIZE * 2];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (byte) (i % 10);
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    s3FileSystem.purge();
+    s3FileSystem.close();
+  }
+  
+  public void testWorkingDirectory() throws Exception {
+
+    Path homeDir = new Path("/user/", System.getProperty("user.name"));
+    assertEquals(homeDir, s3FileSystem.getWorkingDirectory());
+
+    s3FileSystem.setWorkingDirectory(new Path("."));
+    assertEquals(homeDir, s3FileSystem.getWorkingDirectory());
+
+    s3FileSystem.setWorkingDirectory(new Path(".."));
+    assertEquals(new Path("/user/"), s3FileSystem.getWorkingDirectory());
+
+    s3FileSystem.setWorkingDirectory(new Path("hadoop"));
+    assertEquals(new Path("/user/hadoop"), s3FileSystem.getWorkingDirectory());
+
+    s3FileSystem.setWorkingDirectory(new Path("/test/hadoop"));
+    assertEquals(new Path("/test/hadoop"), s3FileSystem.getWorkingDirectory());
+
+  }
+
+  public void testMkdirs() throws Exception {
+    Path testDir = new Path("/test/hadoop");
+    assertFalse(s3FileSystem.exists(testDir));
+    assertFalse(s3FileSystem.isDirectory(testDir));
+    assertFalse(s3FileSystem.isFile(testDir));
+
+    assertTrue(s3FileSystem.mkdirs(testDir));
+
+    assertTrue(s3FileSystem.exists(testDir));
+    assertTrue(s3FileSystem.isDirectory(testDir));
+    assertFalse(s3FileSystem.isFile(testDir));
+
+    Path parentDir = testDir.getParent();
+    assertTrue(s3FileSystem.exists(parentDir));
+    assertTrue(s3FileSystem.isDirectory(parentDir));
+    assertFalse(s3FileSystem.isFile(parentDir));
+
+    Path grandparentDir = parentDir.getParent();
+    assertTrue(s3FileSystem.exists(grandparentDir));
+    assertTrue(s3FileSystem.isDirectory(grandparentDir));
+    assertFalse(s3FileSystem.isFile(grandparentDir));
+  }
+
+  public void testListPathsRaw() throws Exception {
+    Path[] testDirs = { new Path("/test/hadoop/a"), new Path("/test/hadoop/b"),
+        new Path("/test/hadoop/c/1"), };
+    assertNull(s3FileSystem.listPathsRaw(testDirs[0]));
+
+    for (Path path : testDirs) {
+      assertTrue(s3FileSystem.mkdirs(path));
+    }
+
+    Path[] paths = s3FileSystem.listPathsRaw(new Path("/"));
+
+    assertEquals(1, paths.length);
+    assertEquals(new Path("/test"), paths[0]);
+
+    paths = s3FileSystem.listPathsRaw(new Path("/test"));
+    assertEquals(1, paths.length);
+    assertEquals(new Path("/test/hadoop"), paths[0]);
+
+    paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop"));
+    assertEquals(3, paths.length);
+    assertEquals(new Path("/test/hadoop/a"), paths[0]);
+    assertEquals(new Path("/test/hadoop/b"), paths[1]);
+    assertEquals(new Path("/test/hadoop/c"), paths[2]);
+
+    paths = s3FileSystem.listPathsRaw(new Path("/test/hadoop/a"));
+    assertEquals(0, paths.length);
+  }
+
+  public void testWriteReadAndDeleteEmptyFile() throws Exception {
+    writeReadAndDelete(0);
+  }
+
+  public void testWriteReadAndDeleteHalfABlock() throws Exception {
+    writeReadAndDelete(BLOCK_SIZE / 2);
+  }
+
+  public void testWriteReadAndDeleteOneBlock() throws Exception {
+    writeReadAndDelete(BLOCK_SIZE);
+  }
+  
+  public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+    writeReadAndDelete(BLOCK_SIZE + BLOCK_SIZE / 2);
+  }
+  
+  public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+    writeReadAndDelete(BLOCK_SIZE * 2);
+  }
+  
+  
+  private void writeReadAndDelete(int len) throws IOException {
+    Path path = new Path("/test/hadoop/file");
+    
+    s3FileSystem.mkdirs(path.getParent());
+
+    FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+    out.write(data, 0, len);
+    out.close();
+
+    assertTrue("Exists", s3FileSystem.exists(path));
+    
+    assertEquals("Block size", Math.min(len, BLOCK_SIZE), s3FileSystem.getBlockSize(path));
+
+    assertEquals("Length", len, s3FileSystem.getLength(path));
+
+    FSInputStream in = s3FileSystem.openRaw(path);
+    byte[] buf = new byte[len];
+
+    in.readFully(0, buf);
+
+    assertEquals(len, buf.length);
+    for (int i = 0; i < buf.length; i++) {
+      assertEquals("Position " + i, data[i], buf[i]);
+    }
+    
+    assertTrue("Deleted", s3FileSystem.deleteRaw(path));
+    
+    assertFalse("No longer exists", s3FileSystem.exists(path));
+
+  }
+
+  public void testOverwrite() throws IOException {
+    Path path = new Path("/test/hadoop/file");
+    
+    s3FileSystem.mkdirs(path.getParent());
+
+    FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+    out.write(data, 0, BLOCK_SIZE);
+    out.close();
+    
+    assertTrue("Exists", s3FileSystem.exists(path));
+    assertEquals("Length", BLOCK_SIZE, s3FileSystem.getLength(path));
+    
+    try {
+      s3FileSystem.createRaw(path, false, (short) 1, 128);
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // Expected
+    }
+    
+    out = s3FileSystem.createRaw(path, true, (short) 1, BLOCK_SIZE);
+    out.write(data, 0, BLOCK_SIZE / 2);
+    out.close();
+    
+    assertTrue("Exists", s3FileSystem.exists(path));
+    assertEquals("Length", BLOCK_SIZE / 2, s3FileSystem.getLength(path));
+    
+  }
+
+  public void testWriteInNonExistentDirectory() {
+    Path path = new Path("/test/hadoop/file");    
+    try {
+      s3FileSystem.createRaw(path, false, (short) 1, 128);
+      fail("Should throw IOException.");
+    } catch (IOException e) {
+      // Expected
+    }
+  }
+
+  public void testRename() throws Exception {
+    int len = BLOCK_SIZE;
+    
+    Path path = new Path("/test/hadoop/file");
+    
+    s3FileSystem.mkdirs(path.getParent());
+
+    FSOutputStream out = s3FileSystem.createRaw(path, false, (short) 1, BLOCK_SIZE);
+    out.write(data, 0, len);
+    out.close();
+
+    assertTrue("Exists", s3FileSystem.exists(path));
+
+    Path newPath = new Path("/test/hadoop/newfile");
+    s3FileSystem.rename(path, newPath);
+    assertFalse("No longer exists", s3FileSystem.exists(path));
+    assertTrue("Moved", s3FileSystem.exists(newPath));
+
+    FSInputStream in = s3FileSystem.openRaw(newPath);
+    byte[] buf = new byte[len];
+    
+    in.readFully(0, buf);
+
+    assertEquals(len, buf.length);
+    for (int i = 0; i < buf.length; i++) {
+      assertEquals("Position " + i, data[i], buf[i]);
+    }
+  }
+
+
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestINode.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,42 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.s3.INode.FileType;
+
+public class TestINode extends TestCase {
+
+  public void testSerializeFileWithSingleBlock() throws IOException {
+    Block[] blocks = { new Block(849282477840258181L, 128L) };
+    INode inode = new INode(FileType.FILE, blocks);
+
+    assertEquals("Length", 1L + 4 + 16, inode.getSerializedLength());
+    InputStream in = inode.serialize();
+
+    INode deserialized = INode.deserialize(in);
+
+    assertEquals("FileType", inode.getFileType(), deserialized.getFileType());
+    Block[] deserializedBlocks = deserialized.getBlocks();
+    assertEquals("Length", 1, deserializedBlocks.length);
+    assertEquals("Id", blocks[0].getId(), deserializedBlocks[0].getId());
+    assertEquals("Length", blocks[0].getLength(), deserializedBlocks[0]
+        .getLength());
+
+  }
+  
+  public void testSerializeDirectory() throws IOException {
+    INode inode = INode.DIRECTORY_INODE;
+    assertEquals("Length", 1L, inode.getSerializedLength());
+    InputStream in = inode.serialize();
+    INode deserialized = INode.deserialize(in);    
+    assertSame(INode.DIRECTORY_INODE, deserialized);
+  }
+  
+  public void testDeserializeNull() throws IOException {
+    assertNull(INode.deserialize(null));
+  }
+
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java?view=auto&rev=486886
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystem.java Wed Dec 13 15:12:07 2006
@@ -0,0 +1,33 @@
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TestInMemoryS3FileSystem extends S3FileSystemBaseTest {
+
+  @Override
+  public FileSystemStore getFileSystemStore() throws IOException {
+    return new InMemoryFileSystemStore();
+  }
+  
+  public void testInitialization() throws IOException {
+    initializationTest("s3://a:b@c", "s3://a:b@c");
+    initializationTest("s3://a:b@c/", "s3://a:b@c");
+    initializationTest("s3://a:b@c/path", "s3://a:b@c");
+    initializationTest("s3://a@c", "s3://a@c");
+    initializationTest("s3://a@c/", "s3://a@c");
+    initializationTest("s3://a@c/path", "s3://a@c");
+    initializationTest("s3://c", "s3://c");
+    initializationTest("s3://c/", "s3://c");
+    initializationTest("s3://c/path", "s3://c");
+  }
+  
+  private void initializationTest(String initializationUri, String expectedUri) throws IOException {
+    S3FileSystem fs = new S3FileSystem(getFileSystemStore());
+    fs.initialize(URI.create(initializationUri), new Configuration());
+    assertEquals(URI.create(expectedUri), fs.getUri());
+  }
+
+}