You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC

svn commit: r830230 [7/9] - in /hadoop/mapreduce/branches/HDFS-641: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-sche...

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Oct 27 15:43:58 2009
@@ -20,8 +20,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.RunJar;
@@ -59,9 +63,17 @@
     LogFactory.getLog(TrackerDistributedCacheManager.class);
 
   private final LocalFileSystem localFs;
+  
+  private LocalDirAllocator lDirAllocator;
+  
+  private Configuration trackerConf;
+  
+  private Random random = new Random();
 
   public TrackerDistributedCacheManager(Configuration conf) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
+    this.trackerConf = conf;
+    this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
   }
 
   /**
@@ -71,7 +83,7 @@
    * @param cache the cache to be localized, this should be specified as
    * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Configuration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the 
+   * @param subDir The base cache subDir where you want to localize the 
    *  files/archives
    * @param fileStatus The file status on the dfs.
    * @param isArchive if the cache is an archive or a file. In case it is an
@@ -94,35 +106,55 @@
    * @throws IOException
    */
   Path getLocalCache(URI cache, Configuration conf,
-      Path baseDir, FileStatus fileStatus,
+      String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
       Path currentWorkDir, boolean honorSymLinkConf)
       throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, confFileStamp);
     CacheStatus lcacheStatus;
-    Path localizedPath;
+    Path localizedPath = null;
     synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
+      lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
         // was never localized
-        lcacheStatus = new CacheStatus(baseDir, 
-          new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
-
-      synchronized (lcacheStatus) {
+        String cachePath = new Path (subDir, 
+          new Path(String.valueOf(random.nextLong()),
+            makeRelative(cache, conf))).toString();
+        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), trackerConf);
+        lcacheStatus = new CacheStatus(
+          new Path(localPath.toString().replace(cachePath, "")), localPath); 
+        cachedArchives.put(key, lcacheStatus);
+      }
+
+      //mark the cache for use. 
+      lcacheStatus.refcount++;
+    }
+    
+    // do the localization, after releasing the global lock
+    synchronized (lcacheStatus) {
+      if (!lcacheStatus.isInited()) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
+            fileStatus, isArchive);
+        lcacheStatus.initComplete();
+      } else {
+        localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+            lcacheStatus, fileStatus, isArchive);
       }
+      createSymlink(conf, cache, lcacheStatus, isArchive,
+          currentWorkDir, honorSymLinkConf);
     }
 
     // try deleting stuff if you can
     long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-      size = get.longValue();
+    synchronized (lcacheStatus) {
+      synchronized (baseDirSize) {
+        Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+        if ( get != null ) {
+         size = get.longValue();
+        } else {
+          LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+        }
       }
     }
     // setting the cache size to a default of 10GB
@@ -142,40 +174,58 @@
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf)
+  void releaseCache(URI cache, Configuration conf, long timeStamp)
     throws IOException {
-    String cacheId = makeRelative(cache, conf);
+    String key = getKey(cache, conf, timeStamp);
     synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        LOG.warn("Cannot find localized cache: " + cache + 
+                 " (key: " + key + ") in releaseCache!");
         return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
       }
+      
+      // decrement ref count 
+      lcacheStatus.refcount--;
     }
   }
 
   // To delete the caches which have a refcount of zero
 
   private void deleteCache(Configuration conf) throws IOException {
+    Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
     // try deleting cache Status with refcount of zero
     synchronized (cachedArchives) {
       for (Iterator<String> it = cachedArchives.keySet().iterator(); 
           it.hasNext();) {
         String cacheId = it.next();
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-              dirSize -= lcacheStatus.size;
-              baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
+        
+        // if reference count is zero 
+        // mark the cache for deletion
+        if (lcacheStatus.refcount == 0) {
+          // delete this cache entry from the global list 
+          // and mark the localized file for deletion
+          deleteSet.add(lcacheStatus);
+          it.remove();
+        }
+      }
+    }
+    
+    // do the deletion, after releasing the global lock
+    for (CacheStatus lcacheStatus : deleteSet) {
+      synchronized (lcacheStatus) {
+        FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+        LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+        // decrement the size of the cache from baseDirSize
+        synchronized (baseDirSize) {
+          Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+          if ( dirSize != null ) {
+            dirSize -= lcacheStatus.size;
+            baseDirSize.put(lcacheStatus.baseDir, dirSize);
+          } else {
+            LOG.warn("Cannot find record of the baseDir: " + 
+                     lcacheStatus.baseDir + " during delete!");
           }
         }
       }
@@ -208,6 +258,11 @@
     return path;
   }
 
+  String getKey(URI cache, Configuration conf, long timeStamp) 
+      throws IOException {
+    return makeRelative(cache, conf) + String.valueOf(timeStamp);
+  }
+  
   /**
    * Returns mtime of a given cache file on hdfs.
    * 
@@ -224,144 +279,115 @@
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
 
-  private Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
+  private Path checkCacheStatusValidity(Configuration conf,
+      URI cache, long confFileStamp,
+      CacheStatus cacheStatus,
+      FileStatus fileStatus,
+      boolean isArchive
+      ) throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    // Has to be 
+    if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                          cacheStatus, fileStatus)) {
+      throw new IOException("Stale cache file: " + cacheStatus.localLoadPath + 
+                            " for cache-file: " + cache);
+    }
 
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private Path localizeCache(Configuration conf,
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive,
-                                    Path currentWorkDir, 
-                                    boolean honorSymLinkConf)
-  throws IOException {
+    LOG.info(String.format("Using existing cache of %s->%s",
+        cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
+  }
+  
+  private void createSymlink(Configuration conf, URI cache,
+      CacheStatus cacheStatus, boolean isArchive,
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
     boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
     if(cache.getFragment() == null) {
       doSymlink = false;
     }
-    FileSystem fs = FileSystem.get(cache, conf);
     String link = 
       currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      LOG.info(String.format("Using existing cache of %s->%s",
-          cache.toString(), cacheStatus.localLoadPath));
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                             link);
-        }
-
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(
-              cacheFilePath(cacheStatus.localLoadPath).toString(), link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
+    if (doSymlink){
+      if (!flink.exists()) {
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
       }
+    }
+  }
+  
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private Path localizeCache(Configuration conf,
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive)
+  throws IOException {
+    FileSystem fs = FileSystem.get(cache, conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Path parchive = null;
+    if (isArchive) {
+      parchive = new Path(cacheStatus.localLoadPath,
+        new Path(cacheStatus.localLoadPath.getName()));
     } else {
+      parchive = cacheStatus.localLoadPath;
+    }
 
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      if ( dirSize != null ) {
-        dirSize -= cacheStatus.size;
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " +
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        LOG.info(String.format("Extracting %s to %s",
-            srcFile.toString(), destDir.toString()));
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        } else {
-          LOG.warn(String.format(
+    if (!localFs.mkdirs(parchive.getParent())) {
+      throw new IOException("Mkdirs failed to create directory " +
+          cacheStatus.localLoadPath.toString());
+    }
+
+    String cacheId = cache.getPath();
+    fs.copyToLocalFile(new Path(cacheId), parchive);
+    if (isArchive) {
+      String tmpArchive = parchive.toString().toLowerCase();
+      File srcFile = new File(parchive.toString());
+      File destDir = new File(parchive.getParent().toString());
+      LOG.info(String.format("Extracting %s to %s",
+          srcFile.toString(), destDir.toString()));
+      if (tmpArchive.endsWith(".jar")) {
+        RunJar.unJar(srcFile, destDir);
+      } else if (tmpArchive.endsWith(".zip")) {
+        FileUtil.unZip(srcFile, destDir);
+      } else if (isTarFile(tmpArchive)) {
+        FileUtil.unTar(srcFile, destDir);
+      } else {
+        LOG.warn(String.format(
             "Cache file %s specified as archive, but not valid extension.", 
             srcFile.toString()));
-          // else will not do anyhting
-          // and copy the file into the dir as it is
-        }
+        // else will not do anyhting
+        // and copy the file into the dir as it is
       }
+    }
 
-      long cacheSize = 
-        FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-        if( dirSize == null ) {
-          dirSize = Long.valueOf(cacheSize);
-        } else {
-          dirSize += cacheSize;
-        }
-        baseDirSize.put(cacheStatus.baseDir, dirSize);
+    long cacheSize = 
+      FileUtil.getDU(new File(parchive.getParent().toString()));
+    cacheStatus.size = cacheSize;
+    synchronized (baseDirSize) {
+      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      if( dirSize == null ) {
+        dirSize = Long.valueOf(cacheSize);
+      } else {
+        dirSize += cacheSize;
       }
+      baseDirSize.put(cacheStatus.baseDir, dirSize);
+    }
 
-      // do chmod here
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
+    // do chmod here
+    try {
+      //Setting recursive permission to grant everyone read and execute
+      FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+    } catch(InterruptedException e) {
       LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-
-      LOG.info(String.format("Cached %s as %s",
-          cache.toString(), cacheStatus.localLoadPath));
     }
 
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(),
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
+    // update cacheStatus to reflect the newly cached file
+    cacheStatus.mtime = getTimestamp(conf, cache);
+
+    LOG.info(String.format("Cached %s as %s",
+             cache.toString(), cacheStatus.localLoadPath));
+    return cacheStatus.localLoadPath;
   }
 
   private static boolean isTarFile(String filename) {
@@ -375,28 +401,22 @@
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
   throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
     } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
+      dfsFileStamp = getTimestamp(conf, cache);
+    }
 
-      // ensure that the file on hdfs hasn't been modified since the job started
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache +
-                              " has changed on HDFS since job started");
-      }
+    // ensure that the file on hdfs hasn't been modified since the job started
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache +
+                            " has changed on HDFS since job started");
+    }
 
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
+    if (dfsFileStamp != lcacheStatus.mtime) {
+      return false;
     }
 
     return true;
@@ -437,9 +457,6 @@
   }
 
   private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
     // the local load path of this cache
     Path localLoadPath;
 
@@ -455,15 +472,31 @@
     // the cache-file modification time
     long mtime;
 
+    // is it initialized ?
+    boolean inited = false;
+    
     public CacheStatus(Path baseDir, Path localLoadPath) {
       super();
-      this.currentStatus = false;
       this.localLoadPath = localLoadPath;
       this.refcount = 0;
       this.mtime = -1;
       this.baseDir = baseDir;
       this.size = 0;
     }
+    
+    Path getBaseDir(){
+      return this.baseDir;
+    }
+    
+    // mark it as initialized
+    void initComplete() {
+      inited = true;
+    }
+
+    // is it initialized?
+    boolean isInited() {
+      return inited;
+    }
   }
 
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Oct 27 15:43:58 2009
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.EOFException;
+import java.io.StringBufferInputStream;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counter;
@@ -29,15 +31,16 @@
 import org.apache.hadoop.mapreduce.Counters;
 
 import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 
-public class EventReader {
+public class EventReader implements Closeable {
   private String version;
   private Schema schema;
-  private FSDataInputStream in;
+  private DataInputStream in;
   private Decoder decoder;
   private DatumReader reader;
 
@@ -57,7 +60,7 @@
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  public EventReader(FSDataInputStream in) throws IOException {
+  public EventReader(DataInputStream in) throws IOException {
     this.in = in;
     this.version = in.readLine();
     
@@ -65,10 +68,8 @@
       throw new IOException("Incompatible event log version: "+version);
     
     this.schema = Schema.parse(in.readLine());
-    this.reader =
-      new SpecificDatumReader(schema,
-                              "org.apache.hadoop.mapreduce.jobhistory.Events$");
-    this.decoder = new BinaryDecoder(in);
+    this.reader = new SpecificDatumReader(schema);
+    this.decoder = new JsonDecoder(schema, in);
   }
   
   /**
@@ -78,10 +79,10 @@
    */
   @SuppressWarnings("unchecked")
   public HistoryEvent getNextEvent() throws IOException {
-    Events.Event wrapper;
+    Event wrapper;
     try {
-      wrapper = (Events.Event)reader.read(null, decoder);
-    } catch (EOFException e) {
+      wrapper = (Event)reader.read(null, decoder);
+    } catch (AvroRuntimeException e) {            // at EOF
       return null;
     }
     HistoryEvent result;
@@ -153,6 +154,7 @@
    * Close the Event reader
    * @throws IOException
    */
+  @Override
   public void close() throws IOException {
     if (in != null) {
       in.close();
@@ -160,12 +162,12 @@
     in = null;
   }
 
-  static Counters fromAvro(Events.Counters counters) {
+  static Counters fromAvro(JhCounters counters) {
     Counters result = new Counters();
-    for (Events.CounterGroup g : counters.groups) {
+    for (JhCounterGroup g : counters.groups) {
       CounterGroup group =
         new CounterGroup(g.name.toString(), g.displayName.toString());
-      for (Events.Counter c : g.counts) {
+      for (JhCounter c : g.counts) {
         group.addCounter(new Counter(c.name.toString(),
                                      c.displayName.toString(),
                                      c.value));

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Tue Oct 27 15:43:58 2009
@@ -28,7 +28,7 @@
 
 import org.apache.avro.Schema;
 import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.generic.GenericData;
@@ -41,27 +41,28 @@
  * 
  */
 class EventWriter {
-  static final String VERSION = "Avro-Binary";
+  static final String VERSION = "Avro-Json";
 
   private FSDataOutputStream out;
-  private DatumWriter<Object> writer =
-    new SpecificDatumWriter(Events.Event._SCHEMA);
+  private DatumWriter<Object> writer = new SpecificDatumWriter(Event.class);
   private Encoder encoder;
   
   EventWriter(FSDataOutputStream out) throws IOException {
     this.out = out;
     out.writeBytes(VERSION);
     out.writeBytes("\n");
-    out.writeBytes(Events.Event._SCHEMA.toString());
+    out.writeBytes(Event._SCHEMA.toString());
     out.writeBytes("\n");
-    this.encoder = new BinaryEncoder(out);
+    this.encoder = new JsonEncoder(Event._SCHEMA, out);
   }
   
   synchronized void write(HistoryEvent event) throws IOException { 
-    Events.Event wrapper = new Events.Event();
+    Event wrapper = new Event();
     wrapper.type = event.getEventType();
     wrapper.event = event.getDatum();
     writer.write(wrapper, encoder);
+    encoder.flush();
+    out.writeBytes("\n");
   }
   
   void flush() throws IOException { 
@@ -74,26 +75,26 @@
   }
 
   private static final Schema GROUPS =
-    Schema.createArray(Events.CounterGroup._SCHEMA);
+    Schema.createArray(JhCounterGroup._SCHEMA);
 
   private static final Schema COUNTERS =
-    Schema.createArray(Events.Counter._SCHEMA);
+    Schema.createArray(JhCounter._SCHEMA);
 
-  static Events.Counters toAvro(Counters counters) {
+  static JhCounters toAvro(Counters counters) {
     return toAvro(counters, "COUNTERS");
   }
-  static Events.Counters toAvro(Counters counters, String name) {
-    Events.Counters result = new Events.Counters();
+  static JhCounters toAvro(Counters counters, String name) {
+    JhCounters result = new JhCounters();
     result.name = new Utf8(name);
-    result.groups = new GenericData.Array<Events.CounterGroup>(0, GROUPS);
+    result.groups = new GenericData.Array<JhCounterGroup>(0, GROUPS);
     if (counters == null) return result;
     for (CounterGroup group : counters) {
-      Events.CounterGroup g = new Events.CounterGroup();
+      JhCounterGroup g = new JhCounterGroup();
       g.name = new Utf8(group.getName());
       g.displayName = new Utf8(group.getDisplayName());
-      g.counts = new GenericData.Array<Events.Counter>(group.size(), COUNTERS);
+      g.counts = new GenericData.Array<JhCounter>(group.size(), COUNTERS);
       for (Counter counter : group) {
-        Events.Counter c = new Events.Counter();
+        JhCounter c = new JhCounter();
         c.name = new Utf8(counter.getName());
         c.displayName = new Utf8(counter.getDisplayName());
         c.value = counter.getValue();

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Oct 27 15:43:58 2009
@@ -21,7 +21,7 @@
 
  "types": [
 
-     {"type": "record", "name": "Counter",
+     {"type": "record", "name": "JhCounter",
       "fields": [
           {"name": "name", "type": "string"},
           {"name": "displayName", "type": "string"},
@@ -29,18 +29,18 @@
       ]
      },
 
-     {"type": "record", "name": "CounterGroup",
+     {"type": "record", "name": "JhCounterGroup",
       "fields": [
           {"name": "name", "type": "string"},
           {"name": "displayName", "type": "string"},
-          {"name": "counts", "type": {"type": "array", "items": "Counter"}}
+          {"name": "counts", "type": {"type": "array", "items": "JhCounter"}}
       ]
      },
 
-     {"type": "record", "name": "Counters",
+     {"type": "record", "name": "JhCounters",
       "fields": [
           {"name": "name", "type": "string"},
-          {"name": "groups", "type": {"type": "array", "items": "CounterGroup"}}
+          {"name": "groups", "type": {"type": "array", "items": "JhCounterGroup"}}
       ]
      },
 
@@ -52,9 +52,9 @@
           {"name": "finishedReduces", "type": "int"},
           {"name": "failedMaps", "type": "int"},
           {"name": "failedReduces", "type": "int"},
-          {"name": "totalCounters", "type": "Counters"},
-          {"name": "mapCounters", "type": "Counters"},
-          {"name": "reduceCounters", "type": "Counters"}
+          {"name": "totalCounters", "type": "JhCounters"},
+          {"name": "mapCounters", "type": "JhCounters"},
+          {"name": "reduceCounters", "type": "JhCounters"}
       ]
      },
 
@@ -120,7 +120,7 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "Counters"}
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 
@@ -135,7 +135,7 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "Counters"}
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 
@@ -148,7 +148,7 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "Counters"}
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 
@@ -192,7 +192,7 @@
           {"name": "taskType", "type": "string"},
           {"name": "finishTime", "type": "long"},
           {"name": "status", "type": "string"},
-          {"name": "counters", "type": "Counters"}
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Tue Oct 27 15:43:58 2009
@@ -27,7 +27,7 @@
 public interface HistoryEvent {
 
   /** Return this event's type. */
-  Events.EventType getEventType();
+  EventType getEventType();
 
   /** Return the Avro datum wrapped by this. */
   Object getDatum();

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
  *
  */
 public class JobFinishedEvent  implements HistoryEvent {
-  private Events.JobFinished datum = new Events.JobFinished();
+  private JobFinished datum = new JobFinished();
 
   /** 
    * Create an event to record successful job completion
@@ -66,9 +66,9 @@
   JobFinishedEvent() {}
 
   public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (Events.JobFinished)datum; }
-  public Events.EventType getEventType() {
-    return Events.EventType.JOB_FINISHED;
+  public void setDatum(Object datum) { this.datum = (JobFinished)datum; }
+  public EventType getEventType() {
+    return EventType.JOB_FINISHED;
   }
 
   /** Get the Job ID */

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Oct 27 15:43:58 2009
@@ -29,16 +29,15 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobTracker;
@@ -312,7 +311,7 @@
 
   private void moveOldFiles() throws IOException {
     //move the log files remaining from last run to the DONE folder
-    //suffix the file name based on Jobtracker identifier so that history
+    //suffix the file name based on Job tracker identifier so that history
     //files with same job id don't get over written in case of recovery.
     FileStatus[] files = logDirFs.listStatus(logDir);
     String jtIdentifier = jobTracker.getTrackerIdentifier();
@@ -324,7 +323,25 @@
       }
       LOG.info("Moving log file from last run: " + fromPath);
       Path toPath = new Path(done, fromPath.getName() + fileSuffix);
-      moveToDoneNow(fromPath, toPath);
+      try {
+        moveToDoneNow(fromPath, toPath);
+      } catch (ChecksumException e) {
+        // If there is an exception moving the file to done because of
+        // a checksum exception, just delete it
+        LOG.warn("Unable to move " + fromPath +", deleting it");
+        try {
+          boolean b = logDirFs.delete(fromPath, false);
+          LOG.debug("Deletion of corrupt file " + fromPath + " returned " + b);
+        } catch (IOException ioe) {
+          // Cannot delete either? Just log and carry on
+          LOG.warn("Unable to delete " + fromPath + "Exception: " +
+              ioe.getMessage());
+        }
+      } catch (IOException e) {
+        // Exceptions other than checksum, just log and continue
+        LOG.warn("Error moving file " + fromPath + " to done folder." +
+            "Ignoring.");
+      }
     }
   }
   

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Oct 27 15:43:58 2009
@@ -105,7 +105,7 @@
   }
   
   private void handleEvent(HistoryEvent event) throws IOException { 
-    Events.EventType type = event.getEventType();
+    EventType type = event.getEventType();
 
     switch (type) {
     case JOB_SUBMITTED:

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
  * a job
  */
 public class JobInfoChangeEvent implements HistoryEvent {
-  private Events.JobInfoChange datum = new Events.JobInfoChange();
+  private JobInfoChange datum = new JobInfoChange();
 
   /** 
    * Create a event to record the submit and launch time of a job
@@ -47,7 +47,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.JobInfoChange)datum;
+    this.datum = (JobInfoChange)datum;
   }
 
   /** Get the Job ID */
@@ -57,8 +57,8 @@
   /** Get the Job launch time */
   public long getLaunchTime() { return datum.launchTime; }
 
-  public Events.EventType getEventType() {
-    return Events.EventType.JOB_INFO_CHANGED;
+  public EventType getEventType() {
+    return EventType.JOB_INFO_CHANGED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
  *
  */
 public class JobInitedEvent implements HistoryEvent {
-  private Events.JobInited datum = new Events.JobInited();
+  private JobInited datum = new JobInited();
 
   /**
    * Create an event to record job initialization
@@ -51,7 +51,7 @@
   JobInitedEvent() { }
 
   public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (Events.JobInited)datum; }
+  public void setDatum(Object datum) { this.datum = (JobInited)datum; }
 
   /** Get the job ID */
   public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
@@ -64,8 +64,8 @@
   /** Get the status */
   public String getStatus() { return datum.jobStatus.toString(); }
  /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.JOB_INITED;
+  public EventType getEventType() {
+    return EventType.JOB_INITED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
  *
  */
 public class JobPriorityChangeEvent implements HistoryEvent {
-  private Events.JobPriorityChange datum = new Events.JobPriorityChange();
+  private JobPriorityChange datum = new JobPriorityChange();
 
   /** Generate an event to record changes in Job priority
    * @param id Job Id
@@ -45,7 +45,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.JobPriorityChange)datum;
+    this.datum = (JobPriorityChange)datum;
   }
 
   /** Get the Job ID */
@@ -55,8 +55,8 @@
     return JobPriority.valueOf(datum.priority.toString());
   }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.JOB_PRIORITY_CHANGED;
+  public EventType getEventType() {
+    return EventType.JOB_PRIORITY_CHANGED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
  *
  */
 public class JobStatusChangedEvent implements HistoryEvent {
-  private Events.JobStatusChanged datum = new Events.JobStatusChanged();
+  private JobStatusChanged datum = new JobStatusChanged();
 
   /**
    * Create an event to record the change in the Job Status
@@ -45,7 +45,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.JobStatusChanged)datum;
+    this.datum = (JobStatusChanged)datum;
   }
 
   /** Get the Job Id */
@@ -53,8 +53,8 @@
   /** Get the event status */
   public String getStatus() { return datum.jobStatus.toString(); }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.JOB_STATUS_CHANGED;
+  public EventType getEventType() {
+    return EventType.JOB_STATUS_CHANGED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
  *
  */
 public class JobSubmittedEvent implements HistoryEvent {
-  private Events.JobSubmitted datum = new Events.JobSubmitted();
+  private JobSubmitted datum = new JobSubmitted();
 
   /**
    * Create an event to record job submission
@@ -52,7 +52,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.JobSubmitted)datum;
+    this.datum = (JobSubmitted)datum;
   }
 
   /** Get the Job Id */
@@ -66,6 +66,6 @@
   /** Get the Path for the Job Configuration file */
   public String getJobConfPath() { return datum.jobConfPath.toString(); }
   /** Get the event type */
-  public Events.EventType getEventType() { return Events.EventType.JOB_SUBMITTED; }
+  public EventType getEventType() { return EventType.JOB_SUBMITTED; }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Tue Oct 27 15:43:58 2009
@@ -29,8 +29,8 @@
  *
  */
 public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
-  private Events.JobUnsuccessfulCompletion datum
-    = new Events.JobUnsuccessfulCompletion();
+  private JobUnsuccessfulCompletion datum
+    = new JobUnsuccessfulCompletion();
 
   /**
    * Create an event to record unsuccessful completion (killed/failed) of jobs
@@ -54,7 +54,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.JobUnsuccessfulCompletion)datum;
+    this.datum = (JobUnsuccessfulCompletion)datum;
   }
 
   /** Get the Job ID */
@@ -68,11 +68,11 @@
   /** Get the status */
   public String getStatus() { return datum.jobStatus.toString(); }
   /** Get the event type */
-  public Events.EventType getEventType() {
+  public EventType getEventType() {
     if ("FAILED".equals(getStatus())) {
-      return Events.EventType.JOB_FAILED;
+      return EventType.JOB_FAILED;
     } else
-      return Events.EventType.JOB_KILLED;
+      return EventType.JOB_KILLED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,7 +32,7 @@
  *
  */
 public class MapAttemptFinishedEvent  implements HistoryEvent {
-  private Events.MapAttemptFinished datum = new Events.MapAttemptFinished();
+  private MapAttemptFinished datum = new MapAttemptFinished();
   
   /** 
    * Create an event for successful completion of map attempts
@@ -64,7 +64,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.MapAttemptFinished)datum;
+    this.datum = (MapAttemptFinished)datum;
   }
 
   /** Get the task ID */
@@ -90,8 +90,8 @@
   /** Get the counters */
   Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-   public Events.EventType getEventType() {
-    return Events.EventType.MAP_ATTEMPT_FINISHED;
+   public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_FINISHED;
   }
   
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,8 +32,8 @@
  *
  */
 public class ReduceAttemptFinishedEvent  implements HistoryEvent {
-  private Events.ReduceAttemptFinished datum =
-    new Events.ReduceAttemptFinished();
+  private ReduceAttemptFinished datum =
+    new ReduceAttemptFinished();
 
   /**
    * Create an event to record completion of a reduce attempt
@@ -68,7 +68,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.ReduceAttemptFinished)datum;
+    this.datum = (ReduceAttemptFinished)datum;
   }
 
   /** Get the Task ID */
@@ -96,8 +96,8 @@
   /** Get the counters for the attempt */
   Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.REDUCE_ATTEMPT_FINISHED;
+  public EventType getEventType() {
+    return EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,7 +32,7 @@
  *
  */
 public class TaskAttemptFinishedEvent  implements HistoryEvent {
-  private Events.TaskAttemptFinished datum = new Events.TaskAttemptFinished();
+  private TaskAttemptFinished datum = new TaskAttemptFinished();
 
   /**
    * Create an event to record successful finishes for setup and cleanup 
@@ -63,7 +63,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.TaskAttemptFinished)datum;
+    this.datum = (TaskAttemptFinished)datum;
   }
 
   /** Get the task ID */
@@ -87,8 +87,8 @@
   /** Get the counters for the attempt */
   Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.MAP_ATTEMPT_FINISHED;
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_FINISHED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
  *
  */
 public class TaskAttemptStartedEvent implements HistoryEvent {
-  private Events.TaskAttemptStarted datum = new Events.TaskAttemptStarted();
+  private TaskAttemptStarted datum = new TaskAttemptStarted();
 
   /**
    * Create an event to record the start of an attempt
@@ -56,7 +56,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.TaskAttemptStarted)datum;
+    this.datum = (TaskAttemptStarted)datum;
   }
 
   /** Get the task id */
@@ -76,8 +76,8 @@
     return TaskAttemptID.forName(datum.attemptId.toString());
   }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.MAP_ATTEMPT_STARTED;
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_STARTED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Tue Oct 27 15:43:58 2009
@@ -31,8 +31,8 @@
  *
  */
 public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
-  private Events.TaskAttemptUnsuccessfulCompletion datum =
-    new Events.TaskAttemptUnsuccessfulCompletion();
+  private TaskAttemptUnsuccessfulCompletion datum =
+    new TaskAttemptUnsuccessfulCompletion();
 
   /** 
    * Create an event to record the unsuccessful completion of attempts
@@ -60,7 +60,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.TaskAttemptUnsuccessfulCompletion)datum;
+    this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
   }
 
   /** Get the task id */
@@ -82,8 +82,8 @@
   /** Get the task status */
   public String getTaskStatus() { return datum.status.toString(); }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.MAP_ATTEMPT_KILLED;
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_KILLED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
  *
  */
 public class TaskFailedEvent implements HistoryEvent {
-  private Events.TaskFailed datum = new Events.TaskFailed();
+  private TaskFailed datum = new TaskFailed();
 
   /**
    * Create an event to record task failure
@@ -58,7 +58,7 @@
   TaskFailedEvent() {}
 
   public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (Events.TaskFailed)datum; }
+  public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
 
   /** Get the task id */
   public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
@@ -79,7 +79,7 @@
   /** Get the task status */
   public String getTaskStatus() { return datum.status.toString(); }
   /** Get the event type */
-  public Events.EventType getEventType() { return Events.EventType.TASK_FAILED; }
+  public EventType getEventType() { return EventType.TASK_FAILED; }
 
   
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
  *
  */
 public class TaskFinishedEvent implements HistoryEvent {
-  private Events.TaskFinished datum = new Events.TaskFinished();
+  private TaskFinished datum = new TaskFinished();
   
   /**
    * Create an event to record the successful completion of a task
@@ -55,7 +55,7 @@
 
   public Object getDatum() { return datum; }
   public void setDatum(Object datum) {
-    this.datum = (Events.TaskFinished)datum;
+    this.datum = (TaskFinished)datum;
   }
 
   /** Get task id */
@@ -71,8 +71,8 @@
   /** Get task status */
   public String getTaskStatus() { return datum.status.toString(); }
   /** Get event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.TASK_FINISHED;
+  public EventType getEventType() {
+    return EventType.TASK_FINISHED;
   }
 
   

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
  *
  */
 public class TaskStartedEvent implements HistoryEvent {
-  private Events.TaskStarted datum = new Events.TaskStarted();
+  private TaskStarted datum = new TaskStarted();
 
   /**
    * Create an event to record start of a task
@@ -50,7 +50,7 @@
   TaskStartedEvent() {}
 
   public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (Events.TaskStarted)datum; }
+  public void setDatum(Object datum) { this.datum = (TaskStarted)datum; }
 
   /** Get the task id */
   public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
@@ -63,8 +63,8 @@
     return TaskType.valueOf(datum.taskType.toString());
   }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.TASK_STARTED;
+  public EventType getEventType() {
+    return EventType.TASK_STARTED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
  *
  */
 public class TaskUpdatedEvent implements HistoryEvent {
-  private Events.TaskUpdated datum = new Events.TaskUpdated();
+  private TaskUpdated datum = new TaskUpdated();
 
   /**
    * Create an event to record task updates
@@ -44,15 +44,15 @@
   TaskUpdatedEvent() {}
 
   public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (Events.TaskUpdated)datum; }
+  public void setDatum(Object datum) { this.datum = (TaskUpdated)datum; }
 
   /** Get the task ID */
   public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the task finish time */
   public long getFinishTime() { return datum.finishTime; }
   /** Get the event type */
-  public Events.EventType getEventType() {
-    return Events.EventType.TASK_UPDATED;
+  public EventType getEventType() {
+    return EventType.TASK_UPDATED;
   }
 
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -70,7 +70,7 @@
 
   private Connection connection;
 
-  private PreparedStatement statement;
+  protected PreparedStatement statement;
 
   private DBConfiguration dbConf;
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -38,10 +38,9 @@
 
   // Execute statements for mysql in unbuffered mode.
   protected ResultSet executeQuery(String query) throws SQLException {
-    PreparedStatement statement = getConnection().prepareStatement(query,
+    statement = getConnection().prepareStatement(query,
       ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
     statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
-    setStatement(statement); // save a ref for cleanup in close()
     return statement.executeQuery();
   }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -39,10 +39,9 @@
 
   // Execute statements for mysql in unbuffered mode.
   protected ResultSet executeQuery(String query) throws SQLException {
-    PreparedStatement statement = getConnection().prepareStatement(query,
+    statement = getConnection().prepareStatement(query,
       ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
     statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
-    setStatement(statement); // save a ref so the close() method cleans this up.
     return statement.executeQuery();
   }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -23,10 +23,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -43,6 +45,9 @@
    * Temporary directory name 
    */
   protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
   private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
@@ -80,21 +85,59 @@
     }
   }
 
+  // True if the job requires output.dir marked on successful job.
+  // Note that by default it is set to true.
+  private boolean shouldMarkOutputDir(Configuration conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+  }
+  
+  // Create a _success file in the job's output dir
+  private void markOutputDirSuccessful(JobContext context) throws IOException {
+    if (outputPath != null) {
+      // create a file in the output folder to mark the job completion
+      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+      outputFileSystem.create(filePath).close();
+    }
+  }
+  
   /**
    * Delete the temporary directory, including all of the work directories.
+   * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
-  public void cleanupJob(JobContext context) throws IOException {
+  public void commitJob(JobContext context) throws IOException {
+    // delete the _temporary folder and create a _done file in the o/p folder
+    cleanup(context);
+    if (shouldMarkOutputDir(context.getConfiguration())) {
+      markOutputDirSuccessful(context);
+    }
+  }
+  
+  // Delete the _temporary folder in the output dir.
+  private void cleanup(JobContext context) throws IOException {
     if (outputPath != null) {
       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (fileSys.exists(tmpDir)) {
         fileSys.delete(tmpDir, true);
       }
+    } else {
+      LOG.warn("Output Path is null in cleanup");
     }
   }
 
   /**
+   * Delete the temporary directory, including all of the work directories.
+   * @param context the job's context
+   */
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state) 
+  throws IOException {
+    // delete the _temporary folder
+    cleanup(context);
+  }
+  
+  /**
    * No task setup required.
    */
   @Override

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Oct 27 15:43:58 2009
@@ -85,8 +85,10 @@
    * Version 27: Changed protocol to use new api objects. And the protocol is 
    *             renamed from JobSubmissionProtocol to ClientProtocol.
    * Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+   * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
+   *             to ClusterMetrics as part of MAPREDUCE-1048.
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Oct 27 15:43:58 2009
@@ -41,6 +41,8 @@
     "mapreduce.jobtracker.maxtasks.perjob";
   public static final String JT_HEARTBEATS_IN_SECOND = 
     "mapreduce.jobtracker.heartbeats.in.second";
+  public static final String JT_HEARTBEATS_SCALING_FACTOR = 
+    "mapreduce.jobtracker.heartbeats.scaling.factor";
   public static final String JT_PERSIST_JOBSTATUS = 
     "mapreduce.jobtracker.persist.jobstatus.active";
   public static final String JT_PERSIST_JOBSTATUS_HOURS = 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Tue Oct 27 15:43:58 2009
@@ -75,5 +75,6 @@
     "mapreduce.tasktracker.taskmemorymanager.monitoringinterval";
   public static final String TT_LOCAL_CACHE_SIZE = 
     "mapreduce.tasktracker.cache.local.size";
-
+  public static final String TT_OUTOFBAND_HEARBEAT =
+    "mapreduce.tasktracker.outofband.heartbeat";
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java Tue Oct 27 15:43:58 2009
@@ -50,8 +50,8 @@
       setsidSupported = false;
     } finally { // handle the exit code
       LOG.info("setsid exited with exit code " + shexec.getExitCode());
-      return setsidSupported;
     }
+    return setsidSupported;
   }
 
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Tue Oct 27 15:43:58 2009
@@ -25,6 +25,7 @@
  *             {@link org.apache.hadoop.mapreduce.util.LinuxMemoryCalculatorPlugin}
  *             instead
  */
+@Deprecated
 public class LinuxMemoryCalculatorPlugin extends
     org.apache.hadoop.mapreduce.util.LinuxMemoryCalculatorPlugin {
   // Inherits everything from the super class

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml Tue Oct 27 15:43:58 2009
@@ -16,6 +16,10 @@
        <Bug pattern="DLS_DEAD_LOCAL_STORE" />
      </Match>
      <Match>
+       <Class name="~.*_jspx" />
+       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+     </Match>
+     <Match>
        <Field name="_jspx_dependants" />
        <Bug pattern="UWF_UNWRITTEN_FIELD" />
      </Match>
@@ -64,6 +68,14 @@
        <Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>
+     <Match>
+       <Class name="~org.apache.hadoop.util.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+     </Match>
+     <Match>
+       <Class name="~org.apache.hadoop.filecache.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+     </Match>
      <!--
        Ignore warnings for usage of System.exit. This is
        required and have been well thought out
@@ -116,6 +128,20 @@
        <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
        <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+       <Or>
+       <Method name="abortJob" />
+       <Method name="commitJob" />
+       <Method name="cleanupJob" />
+       </Or>
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
+       <Method name="next" />
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
      <!--
        Ignoring this warning as resolving this would need a non-trivial change in code 
      -->
@@ -163,6 +189,11 @@
        <Field name="inputs" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.JobTracker" />
+       <Method name="updateTaskTrackerStatus" />
+       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+     </Match>
 
     <!--
      This class is unlikely to get subclassed, so ignore

Propchange: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
 /hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:817879-818559
+/hadoop/mapreduce/trunk/src/test/mapred:817878-830225

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java Tue Oct 27 15:43:58 2009
@@ -17,14 +17,15 @@
  */
 package org.apache.hadoop.conf;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 
-public class TestJobConf extends TestCase {
+public class TestJobConf {
 
+  @Test
   public void testProfileParamsDefaults() {
     JobConf configuration = new JobConf();
 
@@ -37,6 +38,7 @@
     Assert.assertTrue(result.startsWith("-agentlib:hprof"));
   }
 
+  @Test
   public void testProfileParamsSetter() {
     JobConf configuration = new JobConf();
 
@@ -44,6 +46,7 @@
     Assert.assertEquals("test", configuration.get(JobContext.TASK_PROFILE_PARAMS));
   }
 
+  @Test
   public void testProfileParamsGetter() {
     JobConf configuration = new JobConf();
 
@@ -55,6 +58,7 @@
    * Testing mapred.task.maxvmem replacement with new values
    *
    */
+  @Test
   public void testMemoryConfigForMapOrReduceTask(){
     JobConf configuration = new JobConf();
     configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
@@ -71,9 +75,9 @@
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , "-1");
     configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
-    configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(300));
-    Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
-    Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
+    configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(400));
+    Assert.assertEquals(configuration.getMemoryForMapTask(), 300);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(), 400);
 
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
@@ -91,13 +95,53 @@
 
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+    configuration.set(JobContext.MAP_MEMORY_MB, "3");
+    configuration.set(JobContext.REDUCE_MEMORY_MB, "3");
     Assert.assertEquals(configuration.getMemoryForMapTask(),2);
     Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+    
   }
 
   /**
+   * Test that negative values for MAPRED_TASK_MAXVMEM_PROPERTY cause
+   * new configuration keys' values to be used.
+   */
+  @Test
+  public void testNegativeValueForTaskVmem() {
+    JobConf configuration = new JobConf();
+    
+    configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-3");
+    configuration.set(JobContext.MAP_MEMORY_MB, "4");
+    configuration.set(JobContext.REDUCE_MEMORY_MB, "5");
+    Assert.assertEquals(4, configuration.getMemoryForMapTask());
+    Assert.assertEquals(5, configuration.getMemoryForReduceTask());
+    
+  }
+  
+  /**
+   * Test that negative values for all memory configuration properties causes
+   * APIs to disable memory limits
+   */
+  @Test
+  public void testNegativeValuesForMemoryParams() {
+    JobConf configuration = new JobConf();
+    
+    configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-4");
+    configuration.set(JobContext.MAP_MEMORY_MB, "-5");
+    configuration.set(JobContext.REDUCE_MEMORY_MB, "-6");
+    
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMemoryForMapTask());
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMemoryForReduceTask());
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMaxVirtualMemoryForTask());
+  }
+  
+  /**
    *   Test deprecated accessor and mutator method for mapred.task.maxvmem
    */
+  @Test
   public void testMaxVirtualMemoryForTask() {
     JobConf configuration = new JobConf();
 
@@ -138,5 +182,7 @@
     configuration.setMaxVirtualMemoryForTask(2 * 1024 * 1024);
     Assert.assertEquals(configuration.getMemoryForMapTask(), 2);
     Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
+    
+    
   }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Tue Oct 27 15:43:58 2009
@@ -84,7 +84,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(outDir,
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Oct 27 15:43:58 2009
@@ -257,7 +257,9 @@
    */
   protected void assertOwnerShip(Path outDir, FileSystem fs)
       throws IOException {
-    for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
+    for (FileStatus status : fs.listStatus(outDir, 
+                                           new Utils.OutputFileUtils
+                                                    .OutputFilesFilter())) {
       String owner = status.getOwner();
       String group = status.getGroup();
       LOG.info("Ownership of the file is " + status.getPath() + " is " + owner

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Oct 27 15:43:58 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 
@@ -188,6 +189,7 @@
         String taskTracker) {
       addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
           JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+
       TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
           0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
           tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
@@ -221,7 +223,7 @@
   }
   
   static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 
-                                             boolean initialContact, 
+		  boolean initialContact, boolean acceptNewTasks,
                                              String tracker, short responseId) 
     throws IOException {
     if (status == null) {
@@ -229,15 +231,39 @@
           JobInProgress.convertTrackerNameToHostName(tracker));
 
     }
-      jt.heartbeat(status, false, initialContact, false, responseId);
+      jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
       return ++responseId ;
   }
   
   static void establishFirstContact(JobTracker jt, String tracker) 
     throws IOException {
-    sendHeartBeat(jt, null, true, tracker, (short) 0);
+    sendHeartBeat(jt, null, true, false, tracker, (short) 0);
   }
 
+  static class FakeTaskInProgress extends TaskInProgress {
+
+    public FakeTaskInProgress(JobID jobId, String jobFile, int numMaps,
+        int partition, JobTracker jobTracker, JobConf conf, JobInProgress job,
+        int numSlotsRequired) {
+      super(jobId, jobFile, numMaps, partition, jobTracker, conf, job,
+          numSlotsRequired);
+    }
+
+    public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+        JobTracker jobTracker, JobConf jobConf,
+        JobInProgress job, int partition, int numSlotsRequired) {
+      super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,
+            partition, numSlotsRequired);
+    }
+
+    @Override
+    synchronized boolean updateStatus(TaskStatus status) {
+      TaskAttemptID taskid = status.getTaskID();
+      taskStatuses.put(taskid, status);
+      return false;
+    }
+  }
+  
   static class FakeJobHistory extends JobHistory {
     @Override
     public void init(JobTracker jt, 

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Tue Oct 27 15:43:58 2009
@@ -96,7 +96,8 @@
 
   public static class NotificationServlet extends HttpServlet {
     public static int counter = 0;
-
+    private static final long serialVersionUID = 1L;
+    
     protected void doGet(HttpServletRequest req, HttpServletResponse res)
       throws ServletException, IOException {
       switch (counter) {

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java Tue Oct 27 15:43:58 2009
@@ -167,7 +167,7 @@
     
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
-        new OutputLogFilter()));
+        new Utils.OutputFileUtils.OutputFilesFilter()));
     
     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
     LOG.debug("mapperOutput " + mapperOutput.size());

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Tue Oct 27 15:43:58 2009
@@ -64,7 +64,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java Tue Oct 27 15:43:58 2009
@@ -127,7 +127,7 @@
     public void reduce(IntWritable key, Iterator<Writable> values, 
                        OutputCollector<IntWritable, Text> out,
                        Reporter reporter) throws IOException {
-      int currentKey = ((IntWritable)(key)).get();
+      int currentKey = key.get();
       // keys should be in descending order
       if (currentKey > lastKey) {
         fail("Keys not in sorted descending order");