You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/20 17:20:22 UTC
svn commit: r1125428 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/tools/org/apache/hadoop/fs/HarFileSystem.java
Author: mahadev
Date: Fri May 20 15:20:22 2011
New Revision: 1125428
URL: http://svn.apache.org/viewvc?rev=1125428&view=rev
Log:
MAPREDUCE-2459. Cache HAR filesystem metadata. (Mac Yang via mahadev)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1125428&r1=1125427&r2=1125428&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 20 15:20:22 2011
@@ -122,6 +122,8 @@ Trunk (unreleased changes)
MAPREDUCE-2516. Rename webinterface.private.actions to
mapreduce.jobtracker.webinterface.trusted (Ari Rabkin via todd)
+ MAPREDUCE-2459. Cache HAR filesystem metadata. (Mac Yang via mahadev)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java?rev=1125428&r1=1125427&r2=1125428&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java Fri May 20 15:20:22 2011
@@ -28,6 +28,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -50,22 +51,20 @@ import org.apache.hadoop.util.Progressab
public class HarFileSystem extends FilterFileSystem {
public static final int VERSION = 3;
+
+ private static final Map<URI, HarMetaData> harMetaCache = new HashMap<URI, HarMetaData>();
+
// uri representation of this Har filesystem
private URI uri;
- // the version of this har filesystem
- private int version;
- // underlying uri
- private URI underLyingURI;
// the top level path of the archive
// in the underlying file system
private Path archivePath;
- // the masterIndex of the archive
- private Path masterIndex;
- // the index file
- private Path archiveIndex;
// the har auth
private String harAuth;
-
+
+ // pointer into the static metadata cache
+ private HarMetaData metadata;
+
/**
* public construction of harfilesystem
*
@@ -96,10 +95,10 @@ public class HarFileSystem extends Filte
* to be used in case not specified.
*/
public void initialize(URI name, Configuration conf) throws IOException {
- //decode the name
- underLyingURI = decodeHarURI(name, conf);
- // we got the right har Path- now check if this is
- //truly a har filesystem
+ // decode the name
+ URI underLyingURI = decodeHarURI(name, conf);
+ // we got the right har Path- now check if this is
+ // truly a har filesystem
Path harPath = archivePath(
new Path(name.getScheme(), name.getAuthority(), name.getPath()));
if (harPath == null) {
@@ -109,50 +108,49 @@ public class HarFileSystem extends Filte
if (fs == null) {
fs = FileSystem.get(underLyingURI, conf);
}
- this.uri = harPath.toUri();
- this.archivePath = new Path(this.uri.getPath());
- this.harAuth = getHarAuth(this.underLyingURI);
+ uri = harPath.toUri();
+ archivePath = new Path(uri.getPath());
+ harAuth = getHarAuth(underLyingURI);
//check for the underlying fs containing
// the index file
- this.masterIndex = new Path(archivePath, "_masterindex");
- this.archiveIndex = new Path(archivePath, "_index");
- if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
+ Path masterIndexPath = new Path(archivePath, "_masterindex");
+ Path archiveIndexPath = new Path(archivePath, "_index");
+ if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) {
throw new IOException("Invalid path for the Har Filesystem. " +
"No index file in " + harPath);
}
- try{
- this.version = getHarVersion();
- } catch(IOException io) {
- throw new IOException("Unable to " +
- "read the version of the Har file system: " + this.archivePath);
- }
- // make it always backwards-compatible
- if (this.version > HarFileSystem.VERSION) {
- throw new IOException("Invalid version " +
- this.version + " expected " + HarFileSystem.VERSION);
+
+ metadata = harMetaCache.get(uri);
+ if (metadata != null) {
+ FileStatus mStat = fs.getFileStatus(masterIndexPath);
+ FileStatus aStat = fs.getFileStatus(archiveIndexPath);
+ if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() ||
+ aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) {
+ // the archive has been overwritten since we last read it
+ // remove the entry from the meta data cache
+ metadata = null;
+ harMetaCache.remove(uri);
+ }
+ }
+ if (metadata == null) {
+ metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath);
+ metadata.parseMetaData();
+ harMetaCache.put(uri, metadata);
}
}
-
+
// get the version of the filesystem from the masterindex file
- // the version is currently not useful since its the first version
+ // the version is currently not useful since its the first version
// of archives
- public int getHarVersion() throws IOException {
- FSDataInputStream masterIn = fs.open(masterIndex);
- LineReader lmaster = new LineReader(masterIn, getConf());
- Text line = new Text();
- lmaster.readLine(line);
- try {
- masterIn.close();
- } catch(IOException e){
- //disregard it.
- // its a read.
- }
- String versionLine = line.toString();
- String[] arr = versionLine.split(" ");
- int version = Integer.parseInt(arr[0]);
- return version;
+ public int getHarVersion() throws IOException {
+ if (metadata != null) {
+ return metadata.getVersion();
+ }
+ else {
+ throw new IOException("Invalid meta data for the Har Filesystem");
+ }
}
-
+
/*
* find the parent path that is the
* archive path in the path. The last
@@ -218,7 +216,7 @@ public class HarFileSystem extends Filte
}
return tmp;
}
-
+
private static String decodeString(String str)
throws UnsupportedEncodingException {
return URLDecoder.decode(str, "UTF-8");
@@ -226,13 +224,13 @@ public class HarFileSystem extends Filte
private String decodeFileName(String fname)
throws UnsupportedEncodingException {
-
+ int version = metadata.getVersion();
if (version == 2 || version == 3){
return decodeString(fname);
}
return fname;
}
-
+
/**
* return the top level archive.
*/
@@ -396,7 +394,7 @@ public class HarFileSystem extends Filte
long len) throws IOException {
HarStatus hstatus = getFileHarStatus(file.getPath());
Path partPath = new Path(archivePath, hstatus.getPartName());
- FileStatus partStatus = fs.getFileStatus(partPath);
+ FileStatus partStatus = metadata.getPartFileStatus(partPath);
// get all part blocks that overlap with the desired file blocks
BlockLocation[] locations =
@@ -448,109 +446,23 @@ public class HarFileSystem extends Filte
*/
private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
List<String> children) throws IOException {
- // read the index file
- FSDataInputStream aIn = null;
- try {
- aIn = fs.open(archiveIndex);
- LineReader aLin;
- long read = 0;
- aLin = new LineReader(aIn, getConf());
- String parentString = parent.getName();
- if (!parentString.endsWith(Path.SEPARATOR)){
- parentString += Path.SEPARATOR;
- }
- Path harPath = new Path(parentString);
- int harlen = harPath.depth();
- Text line = new Text();
- final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
- for(final long len = fs.getFileStatus(archiveIndex).getLen();
- read < len; ) {
- int tmp = aLin.readLine(line);
- read += tmp;
- String lineFeed = line.toString();
- String child = decodeFileName(lineFeed.substring(0, lineFeed.indexOf(" ")));
- if ((child.startsWith(parentString))) {
- Path thisPath = new Path(child);
- if (thisPath.depth() == harlen + 1) {
- // bingo!
- HarStatus hstatus = new HarStatus(lineFeed);
- statuses.add(toFileStatus(hstatus, cache));
- }
- line.clear();
- }
- }
- } finally {
- if (aIn != null) {
- aIn.close();
- }
- }
- }
-
- // make sure that this harPath is relative to the har filesystem
- // this only works for relative paths. This returns the line matching
- // the file in the index. Returns a null if there is not matching
- // filename in the index file.
- private String fileStatusInIndex(Path harPath) throws IOException {
- // read the index file
- int hashCode = getHarHash(harPath);
- // get the master index to find the pos
- // in the index file
- FSDataInputStream in = fs.open(masterIndex);
- FileStatus masterStat = fs.getFileStatus(masterIndex);
- LineReader lin = new LineReader(in, getConf());
- Text line = new Text();
- long read = lin.readLine(line);
- //ignore the first line. this is the header of the index files
- String[] readStr = null;
- List<Store> stores = new ArrayList<Store>();
- while(read < masterStat.getLen()) {
- int b = lin.readLine(line);
- read += b;
- readStr = line.toString().split(" ");
- int startHash = Integer.parseInt(readStr[0]);
- int endHash = Integer.parseInt(readStr[1]);
- if (startHash <= hashCode && hashCode <= endHash) {
- stores.add(new Store(Long.parseLong(readStr[2]),
- Long.parseLong(readStr[3]), startHash,
- endHash));
- }
- line.clear();
- }
- try {
- lin.close();
- } catch(IOException io){
- // do nothing just a read.
- }
- FSDataInputStream aIn = fs.open(archiveIndex);
- LineReader aLin;
- String retStr = null;
- // now start reading the real index file
- for (Store s: stores) {
- read = 0;
- aIn.seek(s.begin);
- aLin = new LineReader(aIn, getConf());
- while (read + s.begin < s.end) {
- int tmp = aLin.readLine(line);
- read += tmp;
- String lineFeed = line.toString();
- String[] parsed = lineFeed.split(" ");
- parsed[0] = decodeFileName(parsed[0]);
- if (harPath.compareTo(new Path(parsed[0])) == 0) {
- // bingo!
- retStr = lineFeed;
- break;
+ String parentString = parent.getName();
+ if (!parentString.endsWith(Path.SEPARATOR)){
+ parentString += Path.SEPARATOR;
+ }
+ Path harPath = new Path(parentString);
+ int harlen = harPath.depth();
+ final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
+
+ for (HarStatus hstatus : metadata.archive.values()) {
+ String child = hstatus.getName();
+ if ((child.startsWith(parentString))) {
+ Path thisPath = new Path(child);
+ if (thisPath.depth() == harlen + 1) {
+ statuses.add(toFileStatus(hstatus, cache));
}
- line.clear();
}
- if (retStr != null)
- break;
- }
- try {
- aIn.close();
- } catch(IOException io) {
- //do nothing
}
- return retStr;
}
/**
@@ -575,6 +487,7 @@ public class HarFileSystem extends Filte
}
long modTime = 0;
+ int version = metadata.getVersion();
if (version < 3) {
modTime = underlying.getModificationTime();
} else if (version == 3) {
@@ -607,6 +520,7 @@ public class HarFileSystem extends Filte
long startIndex;
long length;
long modificationTime = 0;
+
public HarStatus(String harString) throws UnsupportedEncodingException {
String[] splits = harString.split(" ");
this.name = decodeFileName(splits[0]);
@@ -616,6 +530,7 @@ public class HarFileSystem extends Filte
this.startIndex = Long.parseLong(splits[3]);
this.length = Long.parseLong(splits[4]);
+ int version = metadata.getVersion();
String[] propSplits = null;
// propSplits is used to retrieve the metainformation that Har versions
// 1 & 2 missed (modification time, permission, owner group).
@@ -698,12 +613,13 @@ public class HarFileSystem extends Filte
if (harPath == null) {
throw new IOException("Invalid file name: " + f + " in " + uri);
}
- String readStr = fileStatusInIndex(harPath);
- if (readStr == null) {
+ HarStatus hstatus = metadata.archive.get(harPath);
+ if (hstatus == null) {
throw new FileNotFoundException("File: " + f + " does not exist in " + uri);
}
- return new HarStatus(readStr);
+ return hstatus;
}
+
/**
* @return null since no checksum algorithm is implemented.
*/
@@ -741,7 +657,7 @@ public class HarFileSystem extends Filte
public FSDataOutputStream create(Path f,
FsPermission permission,
- EnumSet<CreateFlag> flag,
+ boolean overwrite,
int bufferSize,
short replication,
long blockSize,
@@ -790,11 +706,10 @@ public class HarFileSystem extends Filte
List<FileStatus> statuses = new ArrayList<FileStatus>();
Path tmpPath = makeQualified(f);
Path harPath = getPathInHar(tmpPath);
- String readStr = fileStatusInIndex(harPath);
- if (readStr == null) {
+ HarStatus hstatus = metadata.archive.get(harPath);
+ if (hstatus == null) {
throw new FileNotFoundException("File " + f + " not found in " + archivePath);
}
- HarStatus hstatus = new HarStatus(readStr);
if (hstatus.isDir()) {
fileStatusesInIndex(hstatus, statuses, hstatus.children);
} else {
@@ -1042,4 +957,114 @@ public class HarFileSystem extends Filte
super(new HarFsInputStream(fs, p, start, length, 0));
}
}
+
+ private class HarMetaData {
+ private FileSystem fs;
+ private int version;
+ // the masterIndex of the archive
+ private Path masterIndexPath;
+ // the index file
+ private Path archiveIndexPath;
+
+ private long masterIndexTimestamp;
+ private long archiveIndexTimestamp;
+
+ List<Store> stores = new ArrayList<Store>();
+ Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
+ private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>();
+
+ public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
+ this.fs = fs;
+ this.masterIndexPath = masterIndexPath;
+ this.archiveIndexPath = archiveIndexPath;
+ }
+
+ public FileStatus getPartFileStatus(Path partPath) throws IOException {
+ FileStatus status;
+ status = partFileStatuses.get(partPath);
+ if (status == null) {
+ status = fs.getFileStatus(partPath);
+ partFileStatuses.put(partPath, status);
+ }
+ return status;
+ }
+
+ public long getMasterIndexTimestamp() {
+ return masterIndexTimestamp;
+ }
+
+ public long getArchiveIndexTimestamp() {
+ return archiveIndexTimestamp;
+ }
+
+ private int getVersion() {
+ return version;
+ }
+
+ private void parseMetaData() throws IOException {
+ FSDataInputStream in = fs.open(masterIndexPath);
+ FileStatus masterStat = fs.getFileStatus(masterIndexPath);
+ masterIndexTimestamp = masterStat.getModificationTime();
+ LineReader lin = new LineReader(in, getConf());
+ Text line = new Text();
+ long read = lin.readLine(line);
+
+ // the first line contains the version of the index file
+ String versionLine = line.toString();
+ String[] arr = versionLine.split(" ");
+ version = Integer.parseInt(arr[0]);
+ // make it always backwards-compatible
+ if (this.version > HarFileSystem.VERSION) {
+ throw new IOException("Invalid version " +
+ this.version + " expected " + HarFileSystem.VERSION);
+ }
+
+ // each line contains a hashcode range and the index file name
+ String[] readStr = null;
+ while(read < masterStat.getLen()) {
+ int b = lin.readLine(line);
+ read += b;
+ readStr = line.toString().split(" ");
+ int startHash = Integer.parseInt(readStr[0]);
+ int endHash = Integer.parseInt(readStr[1]);
+ stores.add(new Store(Long.parseLong(readStr[2]),
+ Long.parseLong(readStr[3]), startHash,
+ endHash));
+ line.clear();
+ }
+ try {
+ // close the master index
+ lin.close();
+ } catch(IOException io){
+ // do nothing just a read.
+ }
+
+ FSDataInputStream aIn = fs.open(archiveIndexPath);
+ FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
+ archiveIndexTimestamp = archiveStat.getModificationTime();
+ LineReader aLin;
+ String retStr = null;
+ // now start reading the real index file
+ for (Store s: stores) {
+ read = 0;
+ aIn.seek(s.begin);
+ aLin = new LineReader(aIn, getConf());
+ while (read + s.begin < s.end) {
+ int tmp = aLin.readLine(line);
+ read += tmp;
+ String lineFeed = line.toString();
+ String[] parsed = lineFeed.split(" ");
+ parsed[0] = decodeFileName(parsed[0]);
+ archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
+ line.clear();
+ }
+ }
+ try {
+ // close the archive index
+ aIn.close();
+ } catch(IOException io) {
+ // do nothing just a read.
+ }
+ }
+ }
}