You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/05/02 21:16:07 UTC
svn commit: r534595 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/fs/s3/ src/test/org/apache/hadoop/fs/s3/
Author: tomwhite
Date: Wed May 2 12:16:06 2007
New Revision: 534595
URL: http://svn.apache.org/viewvc?view=rev&rev=534595
Log:
HADOOP-1061. Fix bug in listing files in the S3 filesystem.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534595&r1=534594&r2=534595
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May 2 12:16:06 2007
@@ -300,6 +300,12 @@
89. HADOOP-1247. Add support to contrib/streaming for aggregate
package, formerly called Abacus. (Runping Qi via cutting)
+90. HADOOP-1061. Fix bug in listing files in the S3 filesystem.
+ NOTE: this change is not backwards compatible! You should use the
+ MigrationTool supplied to migrate existing S3 filesystem data to
+ the new format. Please backup your data first before upgrading
+ (using 'hadoop distcp' for example). (tomwhite)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/FileSystemStore.java Wed May 2 12:16:06 2007
@@ -14,6 +14,7 @@
public interface FileSystemStore {
void initialize(URI uri, Configuration conf) throws IOException;
+ String getVersion() throws IOException;
void storeINode(Path path, INode inode) throws IOException;
void storeBlock(Block block, File file) throws IOException;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java Wed May 2 12:16:06 2007
@@ -9,10 +9,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -27,8 +26,26 @@
import org.jets3t.service.security.AWSCredentials;
class Jets3tFileSystemStore implements FileSystemStore {
+
+ private static final String FILE_SYSTEM_NAME = "fs";
+ private static final String FILE_SYSTEM_VALUE = "Hadoop";
+
+ private static final String FILE_SYSTEM_TYPE_NAME = "fs-type";
+ private static final String FILE_SYSTEM_TYPE_VALUE = "block";
- private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR);
+ private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
+ private static final String FILE_SYSTEM_VERSION_VALUE = "1";
+
+ private static final Map<String, String> METADATA =
+ new HashMap<String, String>();
+
+ static {
+ METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
+ METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE);
+ METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE);
+ }
+
+ private static final String PATH_DELIMITER = Path.SEPARATOR;
private static final String BLOCK_PREFIX = "block_";
private Configuration conf;
@@ -94,7 +111,7 @@
createBucket(bucket.getName());
this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
- }
+ }
private void createBucket(String bucketName) throws IOException {
try {
@@ -106,6 +123,10 @@
throw new S3Exception(e);
}
}
+
+ public String getVersion() throws IOException {
+ return FILE_SYSTEM_VERSION_VALUE;
+ }
private void delete(String key) throws IOException {
try {
@@ -127,7 +148,7 @@
}
public boolean inodeExists(Path path) throws IOException {
- InputStream in = get(pathToKey(path));
+ InputStream in = get(pathToKey(path), true);
if (in == null) {
return false;
}
@@ -136,7 +157,7 @@
}
public boolean blockExists(long blockId) throws IOException {
- InputStream in = get(blockToKey(blockId));
+ InputStream in = get(blockToKey(blockId), false);
if (in == null) {
return false;
}
@@ -144,9 +165,14 @@
return true;
}
- private InputStream get(String key) throws IOException {
+ private InputStream get(String key, boolean checkMetadata)
+ throws IOException {
+
try {
S3Object object = s3Service.getObject(bucket, key);
+ if (checkMetadata) {
+ checkMetadata(object);
+ }
return object.getDataInputStream();
} catch (S3ServiceException e) {
if (e.getS3ErrorCode().equals("NoSuchKey")) {
@@ -175,8 +201,26 @@
}
}
+ private void checkMetadata(S3Object object) throws S3FileSystemException,
+ S3ServiceException {
+
+ String name = (String) object.getMetadata(FILE_SYSTEM_NAME);
+ if (!FILE_SYSTEM_VALUE.equals(name)) {
+ throw new S3FileSystemException("Not a Hadoop S3 file.");
+ }
+ String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME);
+ if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) {
+ throw new S3FileSystemException("Not a block file.");
+ }
+ String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME);
+ if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) {
+ throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE,
+ dataVersion);
+ }
+ }
+
public INode retrieveINode(Path path) throws IOException {
- return INode.deserialize(get(pathToKey(path)));
+ return INode.deserialize(get(pathToKey(path), true));
}
public File retrieveBlock(Block block, long byteRangeStart)
@@ -224,7 +268,7 @@
if (!prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
- S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER, 0);
+ S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
@@ -260,12 +304,17 @@
}
}
- private void put(String key, InputStream in, long length) throws IOException {
+ private void put(String key, InputStream in, long length, boolean storeMetadata)
+ throws IOException {
+
try {
S3Object object = new S3Object(key);
object.setDataInputStream(in);
object.setContentType("binary/octet-stream");
object.setContentLength(length);
+ if (storeMetadata) {
+ object.addAllMetadata(METADATA);
+ }
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
@@ -276,14 +325,14 @@
}
public void storeINode(Path path, INode inode) throws IOException {
- put(pathToKey(path), inode.serialize(), inode.getSerializedLength());
+ put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true);
}
public void storeBlock(Block block, File file) throws IOException {
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
- put(blockToKey(block), in, block.getLength());
+ put(blockToKey(block), in, block.getLength(), false);
} finally {
closeQuietly(in);
}
@@ -303,35 +352,13 @@
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
- return urlEncode(path.toUri().getPath());
+ return path.toUri().getPath();
}
private Path keyToPath(String key) {
- return new Path(urlDecode(key));
- }
-
- private static String urlEncode(String s) {
- try {
- return URLEncoder.encode(s, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // Should never happen since every implementation of the Java Platform
- // is required to support UTF-8.
- // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
- throw new IllegalStateException(e);
- }
+ return new Path(key);
}
- private static String urlDecode(String s) {
- try {
- return URLDecoder.decode(s, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // Should never happen since every implementation of the Java Platform
- // is required to support UTF-8.
- // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
- throw new IllegalStateException(e);
- }
- }
-
private String blockToKey(long blockId) {
return BLOCK_PREFIX + blockId;
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java?view=auto&rev=534595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/MigrationTool.java Wed May 2 12:16:06 2007
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolBase;
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.security.AWSCredentials;
+
+/**
+ * <p>
+ * This class is a tool for migrating data from an older to a newer version
+ * of an S3 filesystem.
+ * </p>
+ * <p>
+ * All files in the filesystem are migrated by re-writing the block metadata
+ * - no datafiles are touched.
+ * </p>
+ */
+public class MigrationTool extends ToolBase {
+
+ private S3Service s3Service;
+ private S3Bucket bucket;
+
+ public static void main(String[] args) throws Exception {
+ int res = new MigrationTool().doMain(new Configuration(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+
+ if (args.length == 0) {
+ System.err.println("Usage: MigrationTool <S3 file system URI>");
+ System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
+ return -1;
+ }
+
+ URI uri = URI.create(args[0]);
+
+ initialize(uri, conf);
+
+ FileSystemStore newStore = new Jets3tFileSystemStore();
+ newStore.initialize(uri, conf);
+
+ if (get("%2F") != null) {
+ System.err.println("Current version number is [unversioned].");
+ System.err.println("Target version number is " +
+ newStore.getVersion() + ".");
+ Store oldStore = new UnversionedStore();
+ migrate(oldStore, newStore);
+ return 0;
+ } else {
+ S3Object root = get("/");
+ if (root != null) {
+ String version = (String) root.getMetadata("fs-version");
+ if (version == null) {
+ System.err.println("Can't detect version - exiting.");
+ } else {
+ String newVersion = newStore.getVersion();
+ System.err.println("Current version number is " + version + ".");
+ System.err.println("Target version number is " + newVersion + ".");
+ if (version.equals(newStore.getVersion())) {
+ System.err.println("No migration required.");
+ return 0;
+ }
+ // use version number to create Store
+ //Store oldStore = ...
+ //migrate(oldStore, newStore);
+ System.err.println("Not currently implemented.");
+ return 0;
+ }
+ }
+ System.err.println("Can't detect version - exiting.");
+ return 0;
+ }
+
+ }
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+
+ this.conf = conf;
+
+ try {
+ String accessKey = null;
+ String secretAccessKey = null;
+ String userInfo = uri.getUserInfo();
+ if (userInfo != null) {
+ int index = userInfo.indexOf(':');
+ if (index != -1) {
+ accessKey = userInfo.substring(0, index);
+ secretAccessKey = userInfo.substring(index + 1);
+ } else {
+ accessKey = userInfo;
+ }
+ }
+ if (accessKey == null) {
+ accessKey = conf.get("fs.s3.awsAccessKeyId");
+ }
+ if (secretAccessKey == null) {
+ secretAccessKey = conf.get("fs.s3.awsSecretAccessKey");
+ }
+ if (accessKey == null && secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID and Secret Access Key " +
+ "must be specified as the username " +
+ "or password (respectively) of a s3 URL, " +
+ "or by setting the " +
+ "fs.s3.awsAccessKeyId or " +
+ "fs.s3.awsSecretAccessKey properties (respectively).");
+ } else if (accessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Access Key ID must be specified " +
+ "as the username of a s3 URL, or by setting the " +
+ "fs.s3.awsAccessKeyId property.");
+ } else if (secretAccessKey == null) {
+ throw new IllegalArgumentException("AWS " +
+ "Secret Access Key must be specified " +
+ "as the password of a s3 URL, or by setting the " +
+ "fs.s3.awsSecretAccessKey property.");
+ }
+ AWSCredentials awsCredentials =
+ new AWSCredentials(accessKey, secretAccessKey);
+ this.s3Service = new RestS3Service(awsCredentials);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ bucket = new S3Bucket(uri.getHost());
+ }
+
+ private void migrate(Store oldStore, FileSystemStore newStore)
+ throws IOException {
+ for (Path path : oldStore.listAllPaths()) {
+ INode inode = oldStore.retrieveINode(path);
+ oldStore.deleteINode(path);
+ newStore.storeINode(path, inode);
+ }
+ }
+
+ private S3Object get(String key) {
+ try {
+ return s3Service.getObject(bucket, key);
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ interface Store {
+
+ Set<Path> listAllPaths() throws IOException;
+ INode retrieveINode(Path path) throws IOException;
+ void deleteINode(Path path) throws IOException;
+
+ }
+
+ class UnversionedStore implements Store {
+
+ public Set<Path> listAllPaths() throws IOException {
+ try {
+ String prefix = urlEncode(Path.SEPARATOR);
+ S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
+ Set<Path> prefixes = new TreeSet<Path>();
+ for (int i = 0; i < objects.length; i++) {
+ prefixes.add(keyToPath(objects[i].getKey()));
+ }
+ return prefixes;
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public void deleteINode(Path path) throws IOException {
+ delete(pathToKey(path));
+ }
+
+ private void delete(String key) throws IOException {
+ try {
+ s3Service.deleteObject(bucket, key);
+ } catch (S3ServiceException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ public INode retrieveINode(Path path) throws IOException {
+ return INode.deserialize(get(pathToKey(path)));
+ }
+
+ private InputStream get(String key) throws IOException {
+ try {
+ S3Object object = s3Service.getObject(bucket, key);
+ return object.getDataInputStream();
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new S3Exception(e);
+ }
+ }
+
+ private String pathToKey(Path path) {
+ if (!path.isAbsolute()) {
+ throw new IllegalArgumentException("Path must be absolute: " + path);
+ }
+ return urlEncode(path.toUri().getPath());
+ }
+
+ private Path keyToPath(String key) {
+ return new Path(urlDecode(key));
+ }
+
+ private String urlEncode(String s) {
+ try {
+ return URLEncoder.encode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Should never happen since every implementation of the Java Platform
+ // is required to support UTF-8.
+ // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private String urlDecode(String s) {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Should never happen since every implementation of the Java Platform
+ // is required to support UTF-8.
+ // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
+ throw new IllegalStateException(e);
+ }
+ }
+
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java?view=auto&rev=534595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystemException.java Wed May 2 12:16:06 2007
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+import java.io.IOException;
+
+/**
+ * Thrown when there is a fatal exception while using {@link S3FileSystem}.
+ */
+public class S3FileSystemException extends IOException {
+ public S3FileSystemException(String message) {
+ super(message);
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java?view=auto&rev=534595
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/VersionMismatchException.java Wed May 2 12:16:06 2007
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3;
+
+/**
+ * Thrown when Hadoop cannot read the version of the data stored
+ * in {@link S3FileSystem}.
+ */
+public class VersionMismatchException extends S3FileSystemException {
+ public VersionMismatchException(String clientVersion, String dataVersion) {
+ super("Version mismatch: client expects version " + clientVersion +
+ ", but data has version " +
+ (dataVersion == null ? "[unversioned]" : dataVersion));
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java?view=diff&rev=534595&r1=534594&r2=534595
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java Wed May 2 12:16:06 2007
@@ -33,6 +33,10 @@
public void initialize(URI uri, Configuration conf) {
this.conf = conf;
}
+
+ public String getVersion() throws IOException {
+ return "0";
+ }
public void deleteINode(Path path) throws IOException {
inodes.remove(path);