You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/20 17:20:22 UTC

svn commit: r1125428 - in /hadoop/mapreduce/trunk: CHANGES.txt src/tools/org/apache/hadoop/fs/HarFileSystem.java

Author: mahadev
Date: Fri May 20 15:20:22 2011
New Revision: 1125428

URL: http://svn.apache.org/viewvc?rev=1125428&view=rev
Log:
MAPREDUCE-2459. Cache HAR filesystem metadata. (Mac Yang via mahadev)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1125428&r1=1125427&r2=1125428&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 20 15:20:22 2011
@@ -122,6 +122,8 @@ Trunk (unreleased changes)
     MAPREDUCE-2516. Rename webinterface.private.actions to
     mapreduce.jobtracker.webinterface.trusted (Ari Rabkin via todd)
 
+    MAPREDUCE-2459. Cache HAR filesystem metadata. (Mac Yang via mahadev)
+
   OPTIMIZATIONS
     
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java?rev=1125428&r1=1125427&r2=1125428&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/fs/HarFileSystem.java Fri May 20 15:20:22 2011
@@ -28,6 +28,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -50,22 +51,20 @@ import org.apache.hadoop.util.Progressab
 
 public class HarFileSystem extends FilterFileSystem {
   public static final int VERSION = 3;
+
+  private static final Map<URI, HarMetaData> harMetaCache = new HashMap<URI, HarMetaData>();
+
   // uri representation of this Har filesystem
   private URI uri;
-  // the version of this har filesystem
-  private int version;
-  // underlying uri 
-  private URI underLyingURI;
   // the top level path of the archive
   // in the underlying file system
   private Path archivePath;
-  // the masterIndex of the archive
-  private Path masterIndex;
-  // the index file 
-  private Path archiveIndex;
   // the har auth
   private String harAuth;
-  
+
+  // pointer into the static metadata cache
+  private HarMetaData metadata;
+
   /**
    * public construction of harfilesystem
    *
@@ -96,10 +95,10 @@ public class HarFileSystem extends Filte
    * to be used in case not specified.
    */
   public void initialize(URI name, Configuration conf) throws IOException {
-    //decode the name
-    underLyingURI = decodeHarURI(name, conf);
-    //  we got the right har Path- now check if this is 
-    //truly a har filesystem
+    // decode the name
+    URI underLyingURI = decodeHarURI(name, conf);
+    // we got the right har Path- now check if this is 
+    // truly a har filesystem
     Path harPath = archivePath(
       new Path(name.getScheme(), name.getAuthority(), name.getPath()));
     if (harPath == null) { 
@@ -109,50 +108,49 @@ public class HarFileSystem extends Filte
     if (fs == null) {
       fs = FileSystem.get(underLyingURI, conf);
     }
-    this.uri = harPath.toUri();
-    this.archivePath = new Path(this.uri.getPath());
-    this.harAuth = getHarAuth(this.underLyingURI);
+    uri = harPath.toUri();
+    archivePath = new Path(uri.getPath());
+    harAuth = getHarAuth(underLyingURI);
     //check for the underlying fs containing
     // the index file
-    this.masterIndex = new Path(archivePath, "_masterindex");
-    this.archiveIndex = new Path(archivePath, "_index");
-    if (!fs.exists(masterIndex) || !fs.exists(archiveIndex)) {
+    Path masterIndexPath = new Path(archivePath, "_masterindex");
+    Path archiveIndexPath = new Path(archivePath, "_index");
+    if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) {
       throw new IOException("Invalid path for the Har Filesystem. " +
           "No index file in " + harPath);
     }
-    try{ 
-      this.version = getHarVersion();
-    } catch(IOException io) {
-      throw new IOException("Unable to " +
-          "read the version of the Har file system: " + this.archivePath);
-    }
-    // make it always backwards-compatible
-    if (this.version > HarFileSystem.VERSION) {
-      throw new IOException("Invalid version " + 
-          this.version + " expected " + HarFileSystem.VERSION);
+
+    metadata = harMetaCache.get(uri);
+    if (metadata != null) {
+      FileStatus mStat = fs.getFileStatus(masterIndexPath);
+      FileStatus aStat = fs.getFileStatus(archiveIndexPath);
+      if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() ||
+          aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) {
+        // the archive has been overwritten since we last read it
+        // remove the entry from the meta data cache
+        metadata = null;
+        harMetaCache.remove(uri);
+      }
+    }
+    if (metadata == null) {
+      metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath);
+      metadata.parseMetaData();
+      harMetaCache.put(uri, metadata);
     }
   }
-  
+
   // get the version of the filesystem from the masterindex file
-  // the version is currently not useful since its the first version 
+  // the version is currently not useful since its the first version
   // of archives
-  public int getHarVersion() throws IOException { 
-    FSDataInputStream masterIn = fs.open(masterIndex);
-    LineReader lmaster = new LineReader(masterIn, getConf());
-    Text line = new Text();
-    lmaster.readLine(line);
-    try {
-      masterIn.close();
-    } catch(IOException e){
-      //disregard it.
-      // its a read.
-    }
-    String versionLine = line.toString();
-    String[] arr = versionLine.split(" ");
-    int version = Integer.parseInt(arr[0]);
-    return version;
+  public int getHarVersion() throws IOException {
+    if (metadata != null) {
+      return metadata.getVersion();
+    }
+    else {
+      throw new IOException("Invalid meta data for the Har Filesystem");
+    }
   }
-  
+
   /*
    * find the parent path that is the 
    * archive path in the path. The last
@@ -218,7 +216,7 @@ public class HarFileSystem extends Filte
     }
     return tmp;
   }
-  
+
   private static String decodeString(String str)
     throws UnsupportedEncodingException {
     return URLDecoder.decode(str, "UTF-8");
@@ -226,13 +224,13 @@ public class HarFileSystem extends Filte
 
   private String decodeFileName(String fname) 
     throws UnsupportedEncodingException {
-    
+    int version = metadata.getVersion();
     if (version == 2 || version == 3){
       return decodeString(fname);
     }
     return fname;
   }
-  
+
   /**
    * return the top level archive.
    */
@@ -396,7 +394,7 @@ public class HarFileSystem extends Filte
                                                long len) throws IOException {
     HarStatus hstatus = getFileHarStatus(file.getPath());
     Path partPath = new Path(archivePath, hstatus.getPartName());
-    FileStatus partStatus = fs.getFileStatus(partPath);
+    FileStatus partStatus = metadata.getPartFileStatus(partPath);
 
     // get all part blocks that overlap with the desired file blocks
     BlockLocation[] locations = 
@@ -448,109 +446,23 @@ public class HarFileSystem extends Filte
    */
   private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
       List<String> children) throws IOException {
-    // read the index file
-    FSDataInputStream aIn = null;
-    try {
-      aIn = fs.open(archiveIndex);
-      LineReader aLin;
-      long read = 0;
-      aLin = new LineReader(aIn, getConf());
-      String parentString = parent.getName();
-      if (!parentString.endsWith(Path.SEPARATOR)){
-          parentString += Path.SEPARATOR;
-      }
-      Path harPath = new Path(parentString);
-      int harlen = harPath.depth();
-      Text line = new Text();
-      final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
-      for(final long len = fs.getFileStatus(archiveIndex).getLen();
-          read < len; ) {
-        int tmp = aLin.readLine(line);
-        read += tmp;
-        String lineFeed = line.toString();
-        String child = decodeFileName(lineFeed.substring(0, lineFeed.indexOf(" ")));
-        if ((child.startsWith(parentString))) {
-          Path thisPath = new Path(child);
-          if (thisPath.depth() == harlen + 1) {
-            // bingo!
-            HarStatus hstatus = new HarStatus(lineFeed);
-            statuses.add(toFileStatus(hstatus, cache));
-          }
-          line.clear();
-        }
-      }
-    } finally {
-      if (aIn != null) {
-        aIn.close();
-      }
-    }
-  }
-  
-  // make sure that this harPath is relative to the har filesystem
-  // this only works for relative paths. This returns the line matching
-  // the file in the index. Returns a null if there is not matching 
-  // filename in the index file.
-  private String fileStatusInIndex(Path harPath) throws IOException {
-    // read the index file 
-    int hashCode = getHarHash(harPath);
-    // get the master index to find the pos 
-    // in the index file
-    FSDataInputStream in = fs.open(masterIndex);
-    FileStatus masterStat = fs.getFileStatus(masterIndex);
-    LineReader lin = new LineReader(in, getConf());
-    Text line = new Text();
-    long read = lin.readLine(line);
-   //ignore the first line. this is the header of the index files
-    String[] readStr = null;
-    List<Store> stores = new ArrayList<Store>();
-    while(read < masterStat.getLen()) {
-      int b = lin.readLine(line);
-      read += b;
-      readStr = line.toString().split(" ");
-      int startHash = Integer.parseInt(readStr[0]);
-      int endHash  = Integer.parseInt(readStr[1]);
-      if (startHash <= hashCode && hashCode <= endHash) {
-        stores.add(new Store(Long.parseLong(readStr[2]), 
-            Long.parseLong(readStr[3]), startHash,
-            endHash));
-      }
-      line.clear();
-    }
-    try {
-      lin.close();
-    } catch(IOException io){
-      // do nothing just a read.
-    }
-    FSDataInputStream aIn = fs.open(archiveIndex);
-    LineReader aLin;
-    String retStr = null;
-    // now start reading the real index file
-    for (Store s: stores) {
-      read = 0;
-      aIn.seek(s.begin);
-      aLin = new LineReader(aIn, getConf());
-      while (read + s.begin < s.end) {
-        int tmp = aLin.readLine(line);
-        read += tmp;
-        String lineFeed = line.toString();
-        String[] parsed = lineFeed.split(" ");
-        parsed[0] = decodeFileName(parsed[0]);
-        if (harPath.compareTo(new Path(parsed[0])) == 0) {
-          // bingo!
-          retStr = lineFeed;
-          break;
+    String parentString = parent.getName();
+    if (!parentString.endsWith(Path.SEPARATOR)){
+        parentString += Path.SEPARATOR;
+    }
+    Path harPath = new Path(parentString);
+    int harlen = harPath.depth();
+    final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
+
+    for (HarStatus hstatus : metadata.archive.values()) {
+      String child = hstatus.getName();
+      if ((child.startsWith(parentString))) {
+        Path thisPath = new Path(child);
+        if (thisPath.depth() == harlen + 1) {
+          statuses.add(toFileStatus(hstatus, cache));
         }
-        line.clear();
       }
-      if (retStr != null)
-        break;
-    }
-    try {
-      aIn.close();
-    } catch(IOException io) {
-      //do nothing
     }
-    return retStr;
   }
 
   /**
@@ -575,6 +487,7 @@ public class HarFileSystem extends Filte
     }
 
     long modTime = 0;
+    int version = metadata.getVersion();
     if (version < 3) {
       modTime = underlying.getModificationTime();
     } else if (version == 3) {
@@ -607,6 +520,7 @@ public class HarFileSystem extends Filte
     long startIndex;
     long length;
     long modificationTime = 0;
+
     public HarStatus(String harString) throws UnsupportedEncodingException {
       String[] splits = harString.split(" ");
       this.name = decodeFileName(splits[0]);
@@ -616,6 +530,7 @@ public class HarFileSystem extends Filte
       this.startIndex = Long.parseLong(splits[3]);
       this.length = Long.parseLong(splits[4]);
 
+      int version = metadata.getVersion();
       String[] propSplits = null;
       // propSplits is used to retrieve the metainformation that Har versions
       // 1 & 2 missed (modification time, permission, owner group).
@@ -698,12 +613,13 @@ public class HarFileSystem extends Filte
     if (harPath == null) {
       throw new IOException("Invalid file name: " + f + " in " + uri);
     }
-    String readStr = fileStatusInIndex(harPath);
-    if (readStr == null) {
+    HarStatus hstatus = metadata.archive.get(harPath);
+    if (hstatus == null) {
       throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
     }
-    return new HarStatus(readStr);
+    return hstatus;
   }
+
   /**
    * @return null since no checksum algorithm is implemented.
    */
@@ -741,7 +657,7 @@ public class HarFileSystem extends Filte
   
   public FSDataOutputStream create(Path f,
       FsPermission permission,
-      EnumSet<CreateFlag> flag,
+      boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize,
@@ -790,11 +706,10 @@ public class HarFileSystem extends Filte
     List<FileStatus> statuses = new ArrayList<FileStatus>();
     Path tmpPath = makeQualified(f);
     Path harPath = getPathInHar(tmpPath);
-    String readStr = fileStatusInIndex(harPath);
-    if (readStr == null) {
+    HarStatus hstatus = metadata.archive.get(harPath);
+    if (hstatus == null) {
       throw new FileNotFoundException("File " + f + " not found in " + archivePath);
     }
-    HarStatus hstatus = new HarStatus(readStr);
     if (hstatus.isDir()) {
       fileStatusesInIndex(hstatus, statuses, hstatus.children);
     } else {
@@ -1042,4 +957,114 @@ public class HarFileSystem extends Filte
         super(new HarFsInputStream(fs, p, start, length, 0));
     }
   }
+
+  private class HarMetaData {
+    private FileSystem fs;
+    private int version;
+    // the masterIndex of the archive
+    private Path masterIndexPath;
+    // the index file 
+    private Path archiveIndexPath;
+
+    private long masterIndexTimestamp;
+    private long archiveIndexTimestamp;
+
+    List<Store> stores = new ArrayList<Store>();
+    Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
+    private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>();
+
+    public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
+      this.fs = fs;
+      this.masterIndexPath = masterIndexPath;
+      this.archiveIndexPath = archiveIndexPath;
+    }
+
+    public FileStatus getPartFileStatus(Path partPath) throws IOException {
+      FileStatus status;
+      status = partFileStatuses.get(partPath);
+      if (status == null) {
+        status = fs.getFileStatus(partPath);
+        partFileStatuses.put(partPath, status);
+      }
+      return status;
+    }
+
+    public long getMasterIndexTimestamp() {
+      return masterIndexTimestamp;
+    }
+
+    public long getArchiveIndexTimestamp() {
+      return archiveIndexTimestamp;
+    }
+
+    private int getVersion() {
+      return version;
+    }
+
+    private void parseMetaData() throws IOException {
+      FSDataInputStream in = fs.open(masterIndexPath);
+      FileStatus masterStat = fs.getFileStatus(masterIndexPath);
+      masterIndexTimestamp = masterStat.getModificationTime();
+      LineReader lin = new LineReader(in, getConf());
+      Text line = new Text();
+      long read = lin.readLine(line);
+
+     // the first line contains the version of the index file
+      String versionLine = line.toString();
+      String[] arr = versionLine.split(" ");
+      version = Integer.parseInt(arr[0]);
+      // make it always backwards-compatible
+      if (this.version > HarFileSystem.VERSION) {
+        throw new IOException("Invalid version " + 
+            this.version + " expected " + HarFileSystem.VERSION);
+      }
+
+      // each line contains a hashcode range and the index file name
+      String[] readStr = null;
+      while(read < masterStat.getLen()) {
+        int b = lin.readLine(line);
+        read += b;
+        readStr = line.toString().split(" ");
+        int startHash = Integer.parseInt(readStr[0]);
+        int endHash  = Integer.parseInt(readStr[1]);
+        stores.add(new Store(Long.parseLong(readStr[2]), 
+            Long.parseLong(readStr[3]), startHash,
+            endHash));
+        line.clear();
+      }
+      try {
+        // close the master index
+        lin.close();
+      } catch(IOException io){
+        // do nothing just a read.
+      }
+
+      FSDataInputStream aIn = fs.open(archiveIndexPath);
+      FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
+      archiveIndexTimestamp = archiveStat.getModificationTime();
+      LineReader aLin;
+      String retStr = null;
+      // now start reading the real index file
+      for (Store s: stores) {
+        read = 0;
+        aIn.seek(s.begin);
+        aLin = new LineReader(aIn, getConf());
+        while (read + s.begin < s.end) {
+          int tmp = aLin.readLine(line);
+          read += tmp;
+          String lineFeed = line.toString();
+          String[] parsed = lineFeed.split(" ");
+          parsed[0] = decodeFileName(parsed[0]);
+          archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
+          line.clear();
+        }
+      }
+      try {
+        // close the archive index
+        aIn.close();
+      } catch(IOException io) {
+        // do nothing just a read.
+      }
+    }
+  }
 }