You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/05 09:27:13 UTC
svn commit: r663487 - in /hadoop/core/trunk: ./ bin/ conf/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/util/
src/test/org/apache/hadoop/fs/
Author: ddas
Date: Thu Jun 5 00:27:13 2008
New Revision: 663487
URL: http://svn.apache.org/viewvc?rev=663487&view=rev
Log:
HADOOP-3307. Support for Archives in Hadoop. Contributed by Mahadev Konar.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/bin/hadoop
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 5 00:27:13 2008
@@ -141,6 +141,8 @@
HADOOP-3187. Quotas for namespace management. (Hairong Kuang via ddas)
+ HADOOP-3307. Support for Archives in Hadoop. (Mahadev Konar via ddas)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Thu Jun 5 00:27:13 2008
@@ -233,6 +233,9 @@
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "archive" ] ; then
+ CLASS=org.apache.hadoop.util.HadoopArchives
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
else
CLASS=$COMMAND
fi
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Jun 5 00:27:13 2008
@@ -210,6 +210,12 @@
</property>
<property>
+ <name>fs.har.impl</name>
+ <value>org.apache.hadoop.fs.HarFileSystem</value>
+ <description>The filesystem for Hadoop archives. </description>
+</property>
+
+<property>
<name>fs.inmemory.size.mb</name>
<value>75</value>
<description>The size of the in-memory filsystem instance in MB</description>
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Thu Jun 5 00:27:13 2008
@@ -44,6 +44,12 @@
protected FileSystem fs;
+ /*
+ * so that extending classes can define it
+ */
+ public FilterFileSystem() {
+ }
+
public FilterFileSystem(FileSystem fs) {
this.fs = fs;
}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/HarFileSystem.java Thu Jun 5 00:27:13 2008
@@ -0,0 +1,874 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This is an implementation of the Hadoop Archive
+ * Filesystem. This archive Filesystem has index files
+ * of the form _index* and has contents of the form
+ * part-*. The index files store the indexes of the
+ * real files. The index files are of the form _masterindex
+ * and _index. The master index is a level of indirection
+ * in to the index file to make the look ups faster. the index
+ * file is sorted with hash code of the paths that it contains
+ * and the master index contains pointers to the positions in
+ * index for ranges of hashcodes.
+ */
+
+public class HarFileSystem extends FilterFileSystem {
+ public static final int VERSION = 1;
+ // uri representation of this Har filesystem
+ private URI uri;
+ // the version of this har filesystem
+ private int version;
+ // underlying uri
+ private URI underLyingURI;
+ // the top level path of the archive
+ // in the underlying file system
+ private Path archivePath;
+ // the masterIndex of the archive
+ private Path masterIndex;
+ // the index file
+ private Path archiveIndex;
+ // the har auth
+ private String harAuth;
+
+ /**
+ * public construction of harfilesystem
+ *
+ */
+ public HarFileSystem() {
+ }
+
+ /**
+ * Constructor to create a HarFileSystem with an
+ * underlying filesystem.
+ * @param fs
+ */
+ public HarFileSystem(FileSystem fs) {
+ super(fs);
+ }
+
+ /**
+ * Initialize a Har filesystem per har archive. The
+ * archive home directory is the top level directory
+ * in the filesystem that contains the HAR archive.
+ * Be careful with this method, you do not want to go
+ * on creating new Filesystem instances per call to
+ * path.getFileSystem().
+ * the uri of Har is
+ * har://underlyingfsscheme-host:port/archivepath.
+ * or
+ * har:///archivepath. This assumes the underlying filesystem
+ * to be used in case not specified.
+ */
+ public void initialize(URI name, Configuration conf) throws IOException {
+ //decode the name
+ underLyingURI = decodeHarURI(name, conf);
+ // we got the right har Path- now check if this is
+ //truly a har filesystem
+ Path harPath = archivePath(new Path(name.toString()));
+ if (harPath == null) {
+ throw new IOException("Invalid path for the Har Filesystem. " +
+ name.toString());
+ }
+ if (fs == null) {
+ fs = FileSystem.get(underLyingURI, conf);
+ }
+ this.uri = harPath.toUri();
+ this.archivePath = new Path(this.uri.getPath());
+ this.harAuth = getHarAuth(this.underLyingURI);
+ //check for the underlying fs containing
+ // the index file
+ this.masterIndex = new Path(archivePath, "_masterindex");
+ this.archiveIndex = new Path(archivePath, "_index");
+ if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
+ throw new IOException("Invalid path for the Har Filesystem. " +
+ "No index file in " + harPath);
+ }
+ try{
+ this.version = getHarVersion();
+ } catch(IOException io) {
+ throw new IOException("Unable to " +
+ "read the version of the Har file system: " + this.archivePath);
+ }
+ if (this.version != HarFileSystem.VERSION) {
+ throw new IOException("Invalid version " +
+ this.version + " expected " + HarFileSystem.VERSION);
+ }
+ }
+
+ // get the version of the filesystem from the masterindex file
+ // the version is currently not useful since its the first version
+ // of archives
+ public int getHarVersion() throws IOException {
+ FSDataInputStream masterIn = fs.open(masterIndex);
+ LineRecordReader.LineReader lmaster = new LineRecordReader.LineReader(
+ masterIn, getConf());
+ Text line = new Text();
+ lmaster.readLine(line);
+ try {
+ masterIn.close();
+ } catch(IOException e){
+ //disregard it.
+ // its a read.
+ }
+ String versionLine = line.toString();
+ String[] arr = versionLine.split(" ");
+ int version = Integer.parseInt(arr[0]);
+ return version;
+ }
+
+ /*
+ * find the parent path that is the
+ * archive path in the path. The last
+ * path segment that ends with .har is
+ * the path that will be returned.
+ */
+ private Path archivePath(Path p) {
+ Path retPath = null;
+ Path tmp = p;
+ for (int i=0; i< p.depth(); i++) {
+ if (tmp.toString().endsWith(".har")) {
+ retPath = tmp;
+ break;
+ }
+ tmp = tmp.getParent();
+ }
+ return retPath;
+ }
+
+ /**
+ * decode the raw URI to get the underlying URI
+ * @param rawURI raw Har URI
+ * @return filtered URI of the underlying fileSystem
+ */
+ private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
+ String tmpAuth = rawURI.getAuthority();
+ //we are using the default file
+ //system in the config
+ //so create a underlying uri and
+ //return it
+ if (tmpAuth == null) {
+ //create a path
+ return FileSystem.getDefaultUri(conf);
+ }
+ String host = rawURI.getHost();
+ String[] str = host.split("-", 2);
+ if (str[0] == null) {
+ throw new IOException("URI: " + rawURI + " is an invalid Har URI.");
+ }
+ String underLyingScheme = str[0];
+ String underLyingHost = (str.length > 1)? str[1]:null;
+ int underLyingPort = rawURI.getPort();
+ String auth = (underLyingHost == null && underLyingPort == -1)?
+ null:(underLyingHost+":"+underLyingPort);
+ URI tmp = null;
+ if (rawURI.getQuery() != null) {
+ // query component not allowed
+ throw new IOException("query component in Path not supported " + rawURI);
+ }
+ try {
+ tmp = new URI(underLyingScheme, auth, rawURI.getPath(),
+ rawURI.getQuery(), rawURI.getFragment());
+ } catch (URISyntaxException e) {
+ // do nothing should not happen
+ }
+ return tmp;
+ }
+
+ /**
+ * return the top level archive.
+ */
+ public Path getWorkingDirectory() {
+ return new Path(uri.toString());
+ }
+
+ /**
+ * Create a har specific auth
+ * har-underlyingfs:port
+ * @param underLyingURI the uri of underlying
+ * filesystem
+ * @return har specific auth
+ */
+ private String getHarAuth(URI underLyingUri) {
+ String auth = underLyingUri.getScheme() + "-";
+ if (underLyingUri.getHost() != null) {
+ auth += underLyingUri.getHost() + ":";
+ if (underLyingUri.getPort() != -1) {
+ auth += underLyingUri.getPort();
+ }
+ }
+ else {
+ auth += ":";
+ }
+ return auth;
+ }
+
+ /**
+ * Returns the uri of this filesystem.
+ * The uri is of the form
+ * har://underlyingfsschema-host:port/pathintheunderlyingfs
+ */
+ @Override
+ public URI getUri() {
+ return this.uri;
+ }
+
+ /**
+ * this method returns the path
+ * inside the har filesystem.
+ * this is relative path inside
+ * the har filesystem.
+ * @param path the fully qualified path in the har filesystem.
+ * @return relative path in the filesystem.
+ */
+ private Path getPathInHar(Path path) {
+ Path harPath = new Path(path.toUri().getPath());
+ if (archivePath.compareTo(harPath) == 0)
+ return new Path(Path.SEPARATOR);
+ Path tmp = new Path(harPath.getName());
+ Path parent = harPath.getParent();
+ while (!(parent.compareTo(archivePath) == 0)) {
+ if (parent.toString().equals(Path.SEPARATOR)) {
+ tmp = null;
+ break;
+ }
+ tmp = new Path(parent.getName(), tmp);
+ parent = parent.getParent();
+ }
+ if (tmp != null)
+ tmp = new Path(Path.SEPARATOR, tmp);
+ return tmp;
+ }
+
+ //the relative path of p. basically
+ // getting rid of /. Parsing and doing
+ // string manipulation is not good - so
+ // just use the path api to do it.
+ private Path makeRelative(String initial, Path p) {
+ Path root = new Path(Path.SEPARATOR);
+ if (root.compareTo(p) == 0)
+ return new Path(initial);
+ Path retPath = new Path(p.getName());
+ Path parent = p.getParent();
+ for (int i=0; i < p.depth()-1; i++) {
+ retPath = new Path(parent.getName(), retPath);
+ parent = parent.getParent();
+ }
+ return new Path(initial, retPath.toString());
+ }
+
+ /* this makes a path qualified in the har filesystem
+ * (non-Javadoc)
+ * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
+ * org.apache.hadoop.fs.Path)
+ */
+ @Override
+ public Path makeQualified(Path path) {
+ // make sure that we just get the
+ // path component
+ Path fsPath = path;
+ if (!path.isAbsolute()) {
+ fsPath = new Path(archivePath, path);
+ }
+
+ URI tmpURI = fsPath.toUri();
+ fsPath = new Path(tmpURI.getPath());
+ //change this to Har uri
+ URI tmp = null;
+ try {
+ tmp = new URI(uri.getScheme(), harAuth, fsPath.toString(),
+ tmpURI.getQuery(), tmpURI.getFragment());
+ } catch(URISyntaxException ue) {
+ LOG.error("Error in URI ", ue);
+ }
+ if (tmp != null) {
+ return new Path(tmp.toString());
+ }
+ return null;
+ }
+
+ /**
+ * get block locations from the underlying fs
+ * @param f the input path for the blocks
+ * @param start the start in the file
+ * @param len the length in the file
+ * @return block locations for this segment of file
+ * @throws IOException
+ */
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path f, long start,
+ long len) throws IOException {
+ // need to look up the file in the underlying fs
+ // look up the index
+
+ // make sure this is a prt of this har filesystem
+ Path p = makeQualified(f);
+ Path harPath = getPathInHar(p);
+ String line = fileStatusInIndex(harPath);
+ if (line == null) {
+ throw new FileNotFoundException("File " + f + " not found");
+ }
+ HarStatus harStatus = new HarStatus(line);
+ if (harStatus.isDir())
+ return new BlockLocation[0];
+ return fs.getFileBlockLocations(new Path(archivePath, harStatus.getPartName()),
+ harStatus.getStartIndex(), harStatus.getLength());
+ }
+
+ /**
+ * the hash of the path p inside iniside
+ * the filesystem
+ * @param p the path in the harfilesystem
+ * @return the hash code of the path.
+ */
+ public static int getHarHash(Path p) {
+ return (p.toString().hashCode() & 0x7fffffff);
+ }
+
+ static class Store {
+ public Store() {
+ begin = end = startHash = endHash = 0;
+ }
+ public Store(long begin, long end, int startHash, int endHash) {
+ this.begin = begin;
+ this.end = end;
+ this.startHash = startHash;
+ this.endHash = endHash;
+ }
+ public long begin;
+ public long end;
+ public int startHash;
+ public int endHash;
+ }
+
+ // make sure that this harPath is relative to the har filesystem
+ // this only works for relative paths. This returns the line matching
+ // the file in the index. Returns a null if there is not matching
+ // filename in the index file.
+ private String fileStatusInIndex(Path harPath) throws IOException {
+ // read the index file
+ int hashCode = getHarHash(harPath);
+ // get the master index to find the pos
+ // in the index file
+ FSDataInputStream in = fs.open(masterIndex);
+ FileStatus masterStat = fs.getFileStatus(masterIndex);
+ LineRecordReader.LineReader lin = new LineRecordReader.LineReader(in,
+ getConf());
+ Text line = new Text();
+ long read = lin.readLine(line);
+ //ignore the first line. this is the header of the index files
+ String[] readStr = null;
+ List<Store> stores = new ArrayList<Store>();
+ while(read < masterStat.getLen()) {
+ int b = lin.readLine(line);
+ read += b;
+ readStr = line.toString().split(" ");
+ int startHash = Integer.parseInt(readStr[0]);
+ int endHash = Integer.parseInt(readStr[1]);
+ if (startHash <= hashCode && hashCode <= endHash) {
+ stores.add(new Store(Long.parseLong(readStr[2]),
+ Long.parseLong(readStr[3]), startHash,
+ endHash));
+ }
+ line.clear();
+ }
+ try {
+ lin.close();
+ } catch(IOException io){
+ // do nothing just a read.
+ }
+ FSDataInputStream aIn = fs.open(archiveIndex);
+ LineRecordReader.LineReader aLin = new LineRecordReader.LineReader(aIn,
+ getConf());
+ String retStr = null;
+ // now start reading the real index file
+ read = 0;
+ for (Store s: stores) {
+ aIn.seek(s.begin);
+ while (read + s.begin < s.end) {
+ int tmp = aLin.readLine(line);
+ read += tmp;
+ String lineFeed = line.toString();
+ String[] parsed = lineFeed.split(" ");
+ if (harPath.compareTo(new Path(parsed[0])) == 0) {
+ // bingo!
+ retStr = lineFeed;
+ break;
+ }
+ line.clear();
+ }
+ if (retStr != null)
+ break;
+ }
+ try {
+ aIn.close();
+ } catch(IOException io) {
+ //do nothing
+ }
+ return retStr;
+ }
+
+ // a single line parser for hadoop archives status
+ // stored in a single line in the index files
+ // the format is of the form
+ // filename "dir"/"file" partFileName startIndex length
+ // <space seperated children>
+ private static class HarStatus {
+ boolean isDir;
+ String name;
+ List<String> children;
+ String partName;
+ long startIndex;
+ long length;
+ public HarStatus(String harString) {
+ String[] splits = harString.split(" ");
+ this.name = splits[0];
+ this.isDir = "dir".equals(splits[1]) ? true: false;
+ // this is equal to "none" if its a directory
+ this.partName = splits[2];
+ this.startIndex = Long.parseLong(splits[3]);
+ this.length = Long.parseLong(splits[4]);
+ if (isDir) {
+ children = new ArrayList<String>();
+ for (int i = 5; i < splits.length; i++) {
+ children.add(splits[i]);
+ }
+ }
+ }
+ public boolean isDir() {
+ return isDir;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<String> getChildren() {
+ return children;
+ }
+ public String getFileName() {
+ return name;
+ }
+ public String getPartName() {
+ return partName;
+ }
+ public long getStartIndex() {
+ return startIndex;
+ }
+ public long getLength() {
+ return length;
+ }
+ }
+
+ /**
+ * return the filestatus of files in har archive.
+ * The permission returned are that of the archive
+ * index files. The permissions are not persisted
+ * while creating a hadoop archive.
+ * @param f the path in har filesystem
+ * @return filestatus.
+ * @throws IOException
+ */
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
+ // get the fs DataInputStream for the the underlying file
+ // look up the index.
+ Path p = makeQualified(f);
+ Path harPath = getPathInHar(p);
+ if (harPath == null) {
+ throw new IOException("Invalid file name: " + f + " in " + uri);
+ }
+ String readStr = fileStatusInIndex(harPath);
+ if (readStr == null) {
+ throw new FileNotFoundException("File: " + f + " does not exist in " + uri);
+ }
+ HarStatus hstatus = null;
+ hstatus = new HarStatus(readStr);
+ return new FileStatus(hstatus.isDir()?0:hstatus.getLength(), hstatus.isDir(),
+ (int)archiveStatus.getReplication(), archiveStatus.getBlockSize(),
+ archiveStatus.getModificationTime(), new FsPermission(
+ archiveStatus.getPermission()), archiveStatus.getOwner(),
+ archiveStatus.getGroup(),
+ makeRelative(this.uri.toString(), new Path(hstatus.name)));
+ }
+
+ /**
+ * Returns a har input stream which fakes end of
+ * file. It reads the index files to get the part
+ * file name and the size and start of the file.
+ */
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ // get the fs DataInputStream for the the underlying file
+ // look up the index.
+ Path p = makeQualified(f);
+ Path harPath = getPathInHar(p);
+ if (harPath == null) {
+ throw new IOException("Invalid file name: " + f + " in " + uri);
+ }
+ String readStr = fileStatusInIndex(harPath);
+ if (readStr == null) {
+ throw new FileNotFoundException(f + ": not found in " + archivePath);
+ }
+ HarStatus hstatus = new HarStatus(readStr);
+ // we got it.. woo hooo!!!
+ if (hstatus.isDir()) {
+ throw new FileNotFoundException(f + " : not a file in " +
+ archivePath);
+ }
+ return new HarFSDataInputStream(fs, new Path(archivePath,
+ hstatus.getPartName()),
+ hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
+ }
+
+ /*
+ * create throws an exception in Har filesystem.
+ * The archive once created cannot be changed.
+ */
+ public FSDataOutputStream create(Path f, int bufferSize)
+ throws IOException {
+ throw new IOException("Har: Create not implemented");
+ }
+
+ public FSDataOutputStream create(Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ throw new IOException("Har: create not implemented.");
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch(IOException ie) {
+ //this might already be closed
+ // ignore
+ }
+ }
+ }
+
+ /**
+ * Not implemented.
+ */
+ @Override
+ public boolean setReplication(Path src, short replication) throws IOException{
+ throw new IOException("Har: setreplication not implemented");
+ }
+
+ /**
+ * Not implemented.
+ */
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ throw new IOException("Har: delete not implemented");
+ }
+
+ /**
+ * liststatus returns the children of a directory
+ * after looking up the index files.
+ */
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ //need to see if the file is an index in file
+ //get the filestatus of the archive directory
+ // we will create fake filestatuses to return
+ // to the client
+ List<FileStatus> statuses = new ArrayList<FileStatus>();
+ FileStatus archiveStatus = fs.getFileStatus(archiveIndex);
+ Path tmpPath = makeQualified(f);
+ Path harPath = getPathInHar(tmpPath);
+ String readStr = fileStatusInIndex(harPath);
+ if (readStr == null) {
+ throw new FileNotFoundException("File " + f + " not found in " + archivePath);
+ }
+ HarStatus hstatus = new HarStatus(readStr);
+ if (!hstatus.isDir())
+ statuses.add(new FileStatus(hstatus.getLength(),
+ hstatus.isDir(),
+ archiveStatus.getReplication(), archiveStatus.getBlockSize(),
+ archiveStatus.getModificationTime(),
+ new FsPermission(archiveStatus.getPermission()),
+ archiveStatus.getOwner(), archiveStatus.getGroup(),
+ makeRelative(this.uri.toString(), new Path(hstatus.name))));
+ else
+ for (String child: hstatus.children) {
+ FileStatus tmp = getFileStatus(new Path(tmpPath, child));
+ statuses.add(tmp);
+ }
+ return statuses.toArray(new FileStatus[statuses.size()]);
+ }
+
+ /**
+ * return the top level archive path.
+ */
+ public Path getHomeDirectory() {
+ return new Path(uri.toString());
+ }
+
+ public void setWorkingDirectory(Path newDir) {
+ //does nothing.
+ }
+
+ /**
+ * not implemented.
+ */
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ throw new IOException("Har: mkdirs not implemented");
+ }
+
+ /**
+ * not implemented.
+ */
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws
+ IOException {
+ throw new IOException("Har: copyfromlocalfile not implemented");
+ }
+
+ /**
+ * copies the file in the har filesystem to a local file.
+ */
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
+ FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
+ }
+
+ /**
+ * not implemented.
+ */
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ throw new IOException("Har: startLocalOutput not implemented");
+ }
+
+ /**
+ * not implemented.
+ */
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ throw new IOException("Har: completeLocalOutput not implemented");
+ }
+
+ /**
+ * not implemented.
+ */
+ public void setOwner(Path p, String username, String groupname)
+ throws IOException {
+ throw new IOException("Har: setowner not implemented");
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void setPermission(Path p, FsPermission permisssion)
+ throws IOException {
+ throw new IOException("Har: setPermission not implemented");
+ }
+
+ /**
+ * Hadoop archives input stream. This input stream fakes EOF
+ * since archive files are part of bigger part files.
+ */
+ private static class HarFSDataInputStream extends FSDataInputStream {
+ /**
+ * Create an input stream that fakes all the reads/positions/seeking.
+ */
+ private static class HarFsInputStream extends FSInputStream {
+ private long position, start, end;
+ //The underlying data input stream that the
+ // underlying filesystem will return.
+ private FSDataInputStream underLyingStream;
+ //one byte buffer
+ private byte[] oneBytebuff = new byte[1];
+ HarFsInputStream(FileSystem fs, Path path, long start,
+ long length, int bufferSize) throws IOException {
+ underLyingStream = fs.open(path, bufferSize);
+ underLyingStream.seek(start);
+ // the start of this file in the part file
+ this.start = start;
+ // the position pointer in the part file
+ this.position = start;
+ // the end pointer in the part file
+ this.end = start + length;
+ }
+
+ public synchronized int available() throws IOException {
+ long remaining = end - underLyingStream.getPos();
+ if (remaining > (long)Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) remaining;
+ }
+
+ public synchronized void close() throws IOException {
+ underLyingStream.close();
+ super.close();
+ }
+
+ //not implemented
+ @Override
+ public void mark(int readLimit) {
+ // do nothing
+ }
+
+ /**
+ * reset is not implemented
+ */
+ public void reset() throws IOException {
+ throw new IOException("reset not implemented.");
+ }
+
+ public synchronized int read() throws IOException {
+ int ret = read(oneBytebuff, 0, 1);
+ return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
+ }
+
+ public synchronized int read(byte[] b) throws IOException {
+ int ret = read(b, 0, b.length);
+ if (ret != -1) {
+ position += ret;
+ }
+ return ret;
+ }
+
+ /**
+ *
+ */
+ public synchronized int read(byte[] b, int offset, int len)
+ throws IOException {
+ int newlen = len;
+ int ret = -1;
+ if (position + len > end) {
+ newlen = (int) (end - position);
+ }
+ // end case
+ if (newlen == 0)
+ return ret;
+ ret = underLyingStream.read(b, offset, newlen);
+ position += ret;
+ return ret;
+ }
+
+ public synchronized long skip(long n) throws IOException {
+ long tmpN = n;
+ if (tmpN > 0) {
+ if (position + tmpN > end) {
+ tmpN = end - position;
+ }
+ underLyingStream.seek(tmpN + position);
+ position += tmpN;
+ return tmpN;
+ }
+ return (tmpN < 0)? -1 : 0;
+ }
+
+ public synchronized long getPos() throws IOException {
+ return (position - start);
+ }
+
+ public synchronized void seek(long pos) throws IOException {
+ if (pos < 0 || (start + pos > end)) {
+ throw new IOException("Failed to seek: EOF");
+ }
+ position = start + pos;
+ underLyingStream.seek(position);
+ }
+
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ //do not need to implement this
+ // hdfs in itself does seektonewsource
+ // while reading.
+ return false;
+ }
+
+ /**
+ * implementing position readable.
+ */
+ public int read(long pos, byte[] b, int offset, int length)
+ throws IOException {
+ int nlength = length;
+ if (start + nlength + pos > end) {
+ nlength = (int) (end - (start + pos));
+ }
+ return underLyingStream.read(pos + start , b, offset, nlength);
+ }
+
+ /**
+ * position readable again.
+ */
+ public void readFully(long pos, byte[] b, int offset, int length)
+ throws IOException {
+ if (start + length + pos > end) {
+ throw new IOException("Not enough bytes to read.");
+ }
+ underLyingStream.readFully(pos + start, b, offset, length);
+ }
+
+ public void readFully(long pos, byte[] b) throws IOException {
+ readFully(pos, b, 0, b.length);
+ }
+
+ }
+
+ /**
+ * constructors for har input stream.
+ * @param fs the underlying filesystem
+ * @param p The path in the underlying filesystem
+ * @param start the start position in the part file
+ * @param length the length of valid data in the part file
+ * @param bufsize the buffer size
+ * @throws IOException
+ */
+ public HarFSDataInputStream(FileSystem fs, Path p, long start,
+ long length, int bufsize) throws IOException {
+ super(new HarFsInputStream(fs, p, start, length, bufsize));
+ }
+
+ /**
+ * constructor for har input stream.
+ * @param fs the underlying filesystem
+ * @param p the path in the underlying file system
+ * @param start the start position in the part file
+ * @param length the length of valid data in the part file.
+ * @throws IOException
+ */
+ public HarFSDataInputStream(FileSystem fs, Path p, long start, long length)
+ throws IOException {
+ super(new HarFsInputStream(fs, p, start, length, 0));
+ }
+ }
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=663487&r1=663486&r2=663487&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Jun 5 00:27:13 2008
@@ -851,7 +851,7 @@
}
private static final Random RANDOM = new Random();
- private static String getRandomId() {
+ public static String getRandomId() {
return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/HadoopArchives.java Thu Jun 5 00:27:13 2008
@@ -0,0 +1,670 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+
+/**
+ * a archive creation utility.
+ * This class provides methods that can be used
+ * to create hadoop archives. For understanding of
+ * Hadoop archives look at {@link HarFileSystem}.
+ */
+public class HadoopArchives implements Tool {
+ private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
+
+ private static final String NAME = "har";
+ static final String SRC_LIST_LABEL = NAME + ".src.list";
+ static final String DST_DIR_LABEL = NAME + ".dest.path";
+ static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
+ static final String JOB_DIR_LABEL = NAME + ".job.dir";
+ static final String SRC_COUNT_LABEL = NAME + ".src.count";
+ static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
+ static final String DST_HAR_LABEL = NAME + ".archive.name";
+ // size of each part file
+ // its fixed for now.
+ static final long partSize = 2 * 1024 * 1024 * 1024;
+
+ private static final String usage = "archive"
+ + " -archiveName NAME <src>* <dest>" +
+ "\n";
+
+
+ private JobConf conf;
+
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ public HadoopArchives(Configuration conf) {
+ setConf(conf);
+ }
+
+ // check the src paths
+ private static void checkPaths(Configuration conf, List<Path> paths) throws
+ IOException {
+ for (Path p : paths) {
+ FileSystem fs = p.getFileSystem(conf);
+ if (!fs.exists(p)) {
+ throw new FileNotFoundException("Source " + p + " does not exist.");
+ }
+ }
+ }
+
+ /**
+ * this assumes that there are two types of files file/dir
+ * @param fs the input filesystem
+ * @param p the top level path
+ * @param out the list of paths output of recursive ls
+ * @throws IOException
+ */
+ private void recursivels(FileSystem fs, Path p, List<FileStatus> out)
+ throws IOException {
+ FileStatus fstatus = fs.getFileStatus(p);
+ if (!fstatus.isDir()) {
+ out.add(fstatus);
+ return;
+ }
+ else {
+ out.add(fstatus);
+ FileStatus[] listStatus = fs.listStatus(p);
+ for (FileStatus stat: listStatus) {
+ recursivels(fs, stat.getPath(), out);
+ }
+ }
+ }
+
+ /**
+ * Input format of a hadoop archive job responsible for
+ * generating splits of the file list
+ */
+
+ static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
+ public void validateInput(JobConf jconf) throws IOException{};
+
+ //generate input splits from the src file lists
+ public InputSplit[] getSplits(JobConf jconf, int numSplits)
+ throws IOException {
+ String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
+ if ("".equals(srcfilelist)) {
+ throw new IOException("Unable to get the " +
+ "src file for archive generation.");
+ }
+ long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
+ if (totalSize == -1) {
+ throw new IOException("Invalid size of files to archive");
+ }
+ //we should be safe since this is set by our own code
+ Path src = new Path(srcfilelist);
+ FileSystem fs = src.getFileSystem(jconf);
+ FileStatus fstatus = fs.getFileStatus(src);
+ ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ SequenceFile.Reader reader = null;
+ // the remaining bytes in the file split
+ long remaining = fstatus.getLen();
+ // the count of sizes calculated till now
+ long currentCount = 0L;
+ // the endposition of the split
+ long lastPos = 0L;
+ // the start position of the split
+ long startPos = 0L;
+ long targetSize = totalSize/numSplits;
+ // create splits of size target size so that all the maps
+ // have equals sized data to read and write to.
+ try {
+ reader = new SequenceFile.Reader(fs, src, jconf);
+ while(reader.next(key, value)) {
+ if (currentCount + key.get() > targetSize && currentCount != 0){
+ long size = lastPos - startPos;
+ splits.add(new FileSplit(src, startPos, size, (String[]) null));
+ remaining = remaining - size;
+ startPos = lastPos;
+ currentCount = 0L;
+ }
+ currentCount += key.get();
+ lastPos = reader.getPosition();
+ }
+ // the remaining not equal to the target size.
+ if (remaining != 0) {
+ splits.add(new FileSplit(src, startPos, remaining, (String[])null));
+ }
+ }
+ finally {
+ reader.close();
+ }
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
+ public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader<LongWritable, Text>(job,
+ (FileSplit)split);
+ }
+ }
+
+ private boolean checkValidName(String name) {
+ if (name.endsWith(".har"))
+ return true;
+ return false;
+ }
+
+
+ private Path largestDepth(List<Path> paths) {
+ Path deepest = paths.get(0);
+ for (Path p: paths) {
+ if (p.depth() > deepest.depth()) {
+ deepest = p;
+ }
+ }
+ return deepest;
+ }
+
+ // this method is tricky. This method writes
+ // the top level directories in such a way so that
+ // the output only contains valid directoreis in archives.
+ // so for an input path specified by the user
+ // as /user/hadoop
+ // we need to index
+ // / as the root
+ // /user as a directory
+ // /user/hadoop as a directory
+ // so for multiple input paths it makes sure that it
+ // does the right thing.
+ // so if the user specifies the input directories as
+ // /user/harry and /user/hadoop
+ // we need to write / and user as its child
+ // and /user and harry and hadoop as its children
+ private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
+ List<Path> paths) throws IOException {
+ //these are qualified paths
+ List<Path> justDirs = new ArrayList<Path>();
+ for (Path p: paths) {
+ if (!p.getFileSystem(getConf()).isFile(p)) {
+ justDirs.add(new Path(p.toUri().getPath()));
+ }
+ else {
+ justDirs.add(new Path(p.getParent().toUri().getPath()));
+ }
+ }
+
+ //get the largest depth path
+ // this is tricky
+ TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
+ Path deepest = largestDepth(paths);
+ Path root = new Path(Path.SEPARATOR);
+ for (int i = 0; i < deepest.depth(); i++) {
+ List<Path> parents = new ArrayList<Path>();
+ for (Path p: justDirs) {
+ if (p.compareTo(root) == 0){
+ //don nothing
+ }
+ else {
+ Path parent = p.getParent();
+ if (allpaths.containsKey(parent.toString())) {
+ HashSet<String> children = allpaths.get(parent.toString());
+ children.add(p.getName());
+ }
+ else {
+ HashSet<String> children = new HashSet<String>();
+ children.add(p.getName());
+ allpaths.put(parent.toString(), children);
+ }
+ parents.add(parent);
+ }
+ }
+ justDirs = parents;
+ }
+ Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
+ for (Map.Entry<String, HashSet<String>> entry : keyVals) {
+ HashSet<String> children = entry.getValue();
+ String toWrite = entry.getKey() + " dir ";
+ StringBuffer sbuff = new StringBuffer();
+ sbuff.append(toWrite);
+ for (String child: children) {
+ sbuff.append(child + " ");
+ }
+ toWrite = sbuff.toString();
+ srcWriter.append(new LongWritable(0L), new Text(toWrite));
+ }
+ }
+
+ /**archive the given source paths into
+ * the dest
+ * @param srcPaths the src paths to be archived
+ * @param dest the dest dir that will contain the archive
+ */
+ public void archive(List<Path> srcPaths, String archiveName, Path dest)
+ throws IOException {
+ boolean isValid = checkValidName(archiveName);
+ if (!isValid) {
+ throw new IOException("Invalid archiveName " + archiveName);
+ }
+ checkPaths(conf, srcPaths);
+ int numFiles = 0;
+ long totalSize = 0;
+ conf.set(DST_HAR_LABEL, archiveName);
+ Path outputPath = new Path(dest, archiveName);
+ FileOutputFormat.setOutputPath(conf, outputPath);
+ conf.set(DST_DIR_LABEL, outputPath.toString());
+ final String randomId = CopyFiles.getRandomId();
+ Path jobDirectory = new Path(conf.getSystemDir(), NAME + "_" + randomId);
+ conf.set(JOB_DIR_LABEL, jobDirectory.toString());
+ //get a tmp directory for input splits
+ FileSystem jobfs = jobDirectory.getFileSystem(conf);
+ jobfs.mkdirs(jobDirectory);
+ Path srcFiles = new Path(jobDirectory, "_har_src_files");
+ conf.set(SRC_LIST_LABEL, srcFiles.toString());
+ SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
+ srcFiles, LongWritable.class, Text.class,
+ SequenceFile.CompressionType.NONE);
+ // get the list of files
+ // create single list of files and dirs
+ try {
+ // write the top level dirs in first
+ writeTopLevelDirs(srcWriter, srcPaths);
+ // these are the input paths passed
+ // from the command line
+ // we do a recursive ls on these paths
+ // and then write them to the input file
+ // one at a time
+ for (Path src: srcPaths) {
+ FileSystem fs = src.getFileSystem(conf);
+ ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
+ recursivels(fs, src, allFiles);
+ for (FileStatus stat: allFiles) {
+ String toWrite = "";
+ long len = stat.isDir()? 0:stat.getLen();
+ if (stat.isDir()) {
+ toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
+ //get the children
+ FileStatus[] list = fs.listStatus(stat.getPath());
+ StringBuffer sbuff = new StringBuffer();
+ sbuff.append(toWrite);
+ for (FileStatus stats: list) {
+ sbuff.append(stats.getPath().getName() + " ");
+ }
+ toWrite = sbuff.toString();
+ }
+ else {
+ toWrite += fs.makeQualified(stat.getPath()) + " file ";
+ }
+ srcWriter.append(new LongWritable(len), new
+ Text(toWrite));
+ numFiles++;
+ totalSize += len;
+ }
+ }
+ } finally {
+ srcWriter.close();
+ }
+ //increase the replication of src files
+ jobfs.setReplication(srcFiles, (short) 10);
+ conf.setInt(SRC_COUNT_LABEL, numFiles);
+ conf.setLong(TOTAL_SIZE_LABEL, totalSize);
+ int numMaps = (int)(totalSize/partSize);
+ //run atleast one map.
+ conf.setNumMapTasks(numMaps == 0? 1:numMaps);
+ conf.setNumReduceTasks(1);
+ conf.setInputFormat(HArchiveInputFormat.class);
+ conf.setOutputFormat(NullOutputFormat.class);
+ conf.setMapperClass(HArchivesMapper.class);
+ conf.setReducerClass(HArchivesReducer.class);
+ conf.setMapOutputKeyClass(IntWritable.class);
+ conf.setMapOutputValueClass(Text.class);
+ FileInputFormat.addInputPath(conf, jobDirectory);
+ //make sure no speculative execution is done
+ conf.setSpeculativeExecution(false);
+ JobClient.runJob(conf);
+ //delete the tmp job directory
+ try {
+ jobfs.delete(jobDirectory, true);
+ } catch(IOException ie) {
+ LOG.info("Unable to clean tmp directory " + jobDirectory);
+ }
+ }
+
+ static class HArchivesMapper
+ implements Mapper<LongWritable, Text, IntWritable, Text> {
+ private JobConf conf = null;
+ int partId = -1 ;
+ Path tmpOutputDir = null;
+ Path tmpOutput = null;
+ String partname = null;
+ FSDataOutputStream partStream = null;
+ FileSystem destFs = null;
+ byte[] buffer;
+ int buf_size = 128 * 1024;
+
+ // configure the mapper and create
+ // the part file.
+ // use map reduce framework to write into
+ // tmp files.
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ // this is tightly tied to map reduce
+ // since it does not expose an api
+ // to get the partition
+ partId = conf.getInt("mapred.task.partition", -1);
+ // create a file name using the partition
+ // we need to write to this directory
+ tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
+ // get the output path and write to the tmp
+ // directory
+ partname = "part-" + partId;
+ tmpOutput = new Path(tmpOutputDir, partname);
+ try {
+ destFs = tmpOutput.getFileSystem(conf);
+ //this was a stale copy
+ if (destFs.exists(tmpOutput)) {
+ destFs.delete(tmpOutput, false);
+ }
+ partStream = destFs.create(tmpOutput);
+ } catch(IOException ie) {
+ throw new RuntimeException("Unable to open output file " + tmpOutput);
+ }
+ buffer = new byte[buf_size];
+ }
+
+ // copy raw data.
+ public void copyData(Path input, FSDataInputStream fsin,
+ FSDataOutputStream fout, Reporter reporter) throws IOException {
+ try {
+ for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
+ fout.write(buffer, 0,cbread);
+ reporter.progress();
+ }
+ } finally {
+ fsin.close();
+ }
+ }
+
+ // the relative path of p. basically
+ // getting rid of schema. Parsing and doing
+ // string manipulation is not good - so
+ // just use the path api to do it.
+ private Path makeRelative(Path p) {
+ Path retPath = new Path(p.toUri().getPath());
+ return retPath;
+ }
+
+ static class MapStat {
+ private String pathname;
+ private boolean isDir;
+ private List<String> children;
+ public MapStat(String line) {
+ String[] splits = line.split(" ");
+ pathname = splits[0];
+ if ("dir".equals(splits[1])) {
+ isDir = true;
+ }
+ else {
+ isDir = false;
+ }
+ if (isDir) {
+ children = new ArrayList<String>();
+ for (int i = 2; i < splits.length; i++) {
+ children.add(splits[i]);
+ }
+ }
+ }
+ }
+ // read files from the split input
+ // and write it onto the part files.
+ // also output hash(name) and string
+ // for reducer to create index
+ // and masterindex files.
+ public void map(LongWritable key, Text value,
+ OutputCollector<IntWritable, Text> out,
+ Reporter reporter) throws IOException {
+ String line = value.toString();
+ MapStat mstat = new MapStat(line);
+ Path srcPath = new Path(mstat.pathname);
+ String towrite = null;
+ Path relPath = makeRelative(srcPath);
+ int hash = HarFileSystem.getHarHash(relPath);
+ long startPos = partStream.getPos();
+ if (mstat.isDir) {
+ towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
+ StringBuffer sbuff = new StringBuffer();
+ sbuff.append(towrite);
+ for (String child: mstat.children) {
+ sbuff.append(child + " ");
+ }
+ towrite = sbuff.toString();
+ //reading directories is also progress
+ reporter.progress();
+ }
+ else {
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ FileStatus srcStatus = srcFs.getFileStatus(srcPath);
+ FSDataInputStream input = srcFs.open(srcStatus.getPath());
+ reporter.setStatus("Copying file " + srcStatus.getPath() +
+ " to archive.");
+ copyData(srcStatus.getPath(), input, partStream, reporter);
+ towrite = relPath.toString() + " file " + partname + " " + startPos
+ + " " + srcStatus.getLen() + " ";
+ }
+ out.collect(new IntWritable(hash), new Text(towrite));
+ }
+
+ public void close() throws IOException {
+ // close the part files.
+ partStream.close();
+ }
+ }
+
+ /** the reduce for creating the index and the master index
+ *
+ */
+ static class HArchivesReducer implements Reducer<IntWritable,
+ Text, Text, Text> {
+ private JobConf conf = null;
+ private long startIndex = 0;
+ private long endIndex = 0;
+ private long startPos = 0;
+ private Path masterIndex = null;
+ private Path index = null;
+ private FileSystem fs = null;
+ private FSDataOutputStream outStream = null;
+ private FSDataOutputStream indexStream = null;
+ private int numIndexes = 1000;
+ private Path tmpOutputDir = null;
+ private int written = 0;
+ private int keyVal = 0;
+
+ // configure
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
+ masterIndex = new Path(tmpOutputDir, "_masterindex");
+ index = new Path(tmpOutputDir, "_index");
+ try {
+ fs = masterIndex.getFileSystem(conf);
+ if (fs.exists(masterIndex)) {
+ fs.delete(masterIndex, false);
+ }
+ if (fs.exists(index)) {
+ fs.delete(index, false);
+ }
+ indexStream = fs.create(index);
+ outStream = fs.create(masterIndex);
+ String version = HarFileSystem.VERSION + " \n";
+ outStream.write(version.getBytes());
+
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // create the index and master index. The input to
+ // the reduce is already sorted by the hash of the
+ // files. SO we just need to write it to the index.
+ // We update the masterindex as soon as we update
+ // numIndex entries.
+ public void reduce(IntWritable key, Iterator<Text> values,
+ OutputCollector<Text, Text> out,
+ Reporter reporter) throws IOException {
+ keyVal = key.get();
+ while(values.hasNext()) {
+ Text value = values.next();
+ String towrite = value.toString() + "\n";
+ indexStream.write(towrite.getBytes());
+ written++;
+ if (written > numIndexes -1) {
+ // every 1000 indexes we report status
+ reporter.setStatus("Creating index for archives");
+ reporter.progress();
+ endIndex = keyVal;
+ String masterWrite = startIndex + " " + endIndex + " " + startPos
+ + " " + indexStream.getPos() + " \n" ;
+ outStream.write(masterWrite.getBytes());
+ startPos = indexStream.getPos();
+ startIndex = endIndex;
+ written = 0;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ //write the last part of the master index.
+ if (written > 0) {
+ String masterWrite = startIndex + " " + keyVal + " " + startPos +
+ " " + indexStream.getPos() + " \n";
+ outStream.write(masterWrite.getBytes());
+ }
+ // close the streams
+ outStream.close();
+ indexStream.close();
+ // try increasing the replication
+ fs.setReplication(index, (short) 10);
+ fs.setReplication(masterIndex, (short) 10);
+ }
+
+ }
+
+ /** the main driver for creating the archives
+ * it takes at least two command line parameters. The src and the
+ * dest. It does an lsr on the source paths.
+ * The mapper created archuves and the reducer creates
+ * the archive index.
+ */
+
+ public int run(String[] args) throws Exception {
+ List<Path> srcPaths = new ArrayList<Path>();
+ Path destPath = null;
+ // check we were supposed to archive or
+ // unarchive
+ String archiveName = null;
+ if (args.length < 2) {
+ System.out.println(usage);
+ throw new IOException("Invalid usage.");
+ }
+ if (!"-archiveName".equals(args[0])) {
+ System.out.println(usage);
+ throw new IOException("Archive Name not specified.");
+ }
+ archiveName = args[1];
+ if (!checkValidName(archiveName)) {
+ throw new IOException("Invalid name for archives. " + archiveName);
+ }
+ for (int i = 2; i < args.length; i++) {
+ if (i == (args.length - 1)) {
+ destPath = new Path(args[i]);
+ }
+ else {
+ srcPaths.add(new Path(args[i]));
+ }
+ }
+ // do a glob on the srcPaths and then pass it on
+ List<Path> globPaths = new ArrayList<Path>();
+ for (Path p: srcPaths) {
+ FileSystem fs = p.getFileSystem(getConf());
+ FileStatus[] statuses = fs.globStatus(p);
+ for (FileStatus status: statuses) {
+ globPaths.add(fs.makeQualified(status.getPath()));
+ }
+ }
+ archive(globPaths, archiveName, destPath);
+ return 0;
+ }
+
+ /** the main functions **/
+ public static void main(String[] args) {
+ JobConf job = new JobConf(HadoopArchives.class);
+ HadoopArchives harchives = new HadoopArchives(job);
+ try {
+ int res = harchives.run(args);
+ System.exit(res);
+ } catch(Exception e) {
+ System.err.println(e.getLocalizedMessage());
+ }
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java?rev=663487&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestHarFileSystem.java Thu Jun 5 00:27:13 2008
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+/**
+ * test the har file system
+ * create a har filesystem
+ * run fs commands
+ * and then run a map reduce job
+ */
+public class TestHarFileSystem extends TestCase {
+ private Path inputPath;
+ private MiniDFSCluster dfscluster;
+ private MiniMRCluster mapred;
+ private FileSystem fs;
+ private Path filea, fileb, filec;
+ private Path archivePath;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
+ fs = dfscluster.getFileSystem();
+ mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
+ inputPath = new Path(fs.getHomeDirectory(), "test");
+ filea = new Path(inputPath,"a");
+ fileb = new Path(inputPath,"b");
+ filec = new Path(inputPath,"c");
+ archivePath = new Path(fs.getHomeDirectory(), "tmp");
+ }
+
+ protected void tearDown() throws Exception {
+ try {
+ if (mapred != null) {
+ mapred.shutdown();
+ }
+ if (dfscluster != null) {
+ dfscluster.shutdown();
+ }
+ } catch(Exception e) {
+ System.err.println(e);
+ }
+ super.tearDown();
+ }
+
+ static class TextMapperReducer implements Mapper<LongWritable, Text, Text, Text>,
+ Reducer<Text, Text, Text, Text> {
+
+ public void configure(JobConf conf) {
+ //do nothing
+ }
+
+ public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ output.collect(value, new Text(""));
+ }
+
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ while(values.hasNext()) {
+ values.next();
+ output.collect(key, null);
+ }
+ }
+ }
+
+ public void testArchives() throws Exception {
+ fs.mkdirs(inputPath);
+
+ FSDataOutputStream out = fs.create(filea);
+ out.write("a".getBytes());
+ out.close();
+ out = fs.create(fileb);
+ out.write("b".getBytes());
+ out.close();
+ out = fs.create(filec);
+ out.write("c".getBytes());
+ out.close();
+ Configuration conf = mapred.createJobConf();
+ HadoopArchives har = new HadoopArchives(conf);
+ String[] args = new String[4];
+ args[0] = "-archiveName";
+ args[1] = "foo.har";
+ args[2] = inputPath.toString();
+ args[3] = archivePath.toString();
+ int ret = ToolRunner.run(har, args);
+ //checl for the existenece of the archive
+ assertTrue(ret == 0);
+ Path finalPath = new Path(archivePath, "foo.har");
+ Path fsPath = new Path(inputPath.toUri().getPath());
+ String relative = fsPath.toString().substring(1);
+ Path filePath = new Path(finalPath, relative);
+ //make it a har path
+ Path harPath = new Path("har://" + filePath.toUri().getPath());
+ assertTrue(fs.exists(new Path(finalPath, "_index")));
+ assertTrue(fs.exists(new Path(finalPath, "_masterindex")));
+ //creation tested
+ //check if the archive is same
+ // do ls and cat on all the files
+ FsShell shell = new FsShell(conf);
+ args = new String[2];
+ args[0] = "-ls";
+ args[1] = harPath.toString();
+ ret = ToolRunner.run(shell, args);
+ // ls should work.
+ assertTrue((ret == 0));
+ //now check for contents of filea
+ // fileb and filec
+ Path harFilea = new Path(harPath, "a");
+ Path harFileb = new Path(harPath, "b");
+ Path harFilec = new Path(harPath, "c");
+ FileSystem harFs = harFilea.getFileSystem(conf);
+ FSDataInputStream fin = harFs.open(harFilea);
+ byte[] b = new byte[4];
+ int readBytes = fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
+ fin = harFs.open(harFileb);
+ fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
+ fin = harFs.open(harFilec);
+ fin.read(b);
+ fin.close();
+ assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
+ // ok all files match
+ // run a map reduce job
+ Path outdir = new Path(fs.getHomeDirectory(), "mapout");
+ JobConf jobconf = mapred.createJobConf();
+ FileInputFormat.addInputPath(jobconf, harPath);
+ jobconf.setInputFormat(TextInputFormat.class);
+ jobconf.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(jobconf, outdir);
+ jobconf.setMapperClass(TextMapperReducer.class);
+ jobconf.setMapOutputKeyClass(Text.class);
+ jobconf.setMapOutputValueClass(Text.class);
+ jobconf.setReducerClass(TextMapperReducer.class);
+ jobconf.setNumReduceTasks(1);
+ JobClient.runJob(jobconf);
+ args[1] = outdir.toString();
+ ret = ToolRunner.run(shell, args);
+
+ FileStatus[] status = fs.globStatus(new Path(outdir, "part*"));
+ Path reduceFile = status[0].getPath();
+ FSDataInputStream reduceIn = fs.open(reduceFile);
+ b = new byte[6];
+ reduceIn.read(b);
+ //assuming all the 6 bytes were read.
+ Text readTxt = new Text(b);
+ assertTrue("a\nb\nc\n".equals(readTxt.toString()));
+ assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1);
+ reduceIn.close();
+ }
+}
\ No newline at end of file