You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [24/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java Fri Jun 21 06:37:27 2013
@@ -351,6 +351,11 @@ class Child {
     // Do the task-type specific localization
     task.localizeConfiguration(jobConf);
     
+    // Write files required to re-run the task with IsolationRunner
+    if (task.supportIsolationRunner(jobConf)) {
+      task.writeFilesRequiredForRerun(jobConf);
+    }
+    
     //write the localized task jobconf
     LocalDirAllocator lDirAlloc = 
       new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Jun 21 06:37:27 2013
@@ -27,9 +27,10 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.security.UserGroupInformation;
 
-class CleanupQueue {
+public class CleanupQueue {
 
   public static final Log LOG =
     LogFactory.getLog(CleanupQueue.class);
@@ -56,10 +57,39 @@ class CleanupQueue {
   static class PathDeletionContext {
     final Path fullPath;// full path of file or dir
     final Configuration conf;
+    final UserGroupInformation ugi;
+    final JobID jobIdTokenRenewalToCancel;
 
     public PathDeletionContext(Path fullPath, Configuration conf) {
+      this(fullPath, conf, null, null);
+    }
+
+    public PathDeletionContext(Path fullPath, Configuration conf,
+        UserGroupInformation ugi) {
+      this(fullPath, conf, ugi, null);
+    }
+    
+    /**
+     * PathDeletionContext ctor which also allows for a job-delegation token
+     * renewal to be cancelled.
+     * 
+     * This is usually used at the end of a job to delete it's final path and 
+     * to cancel renewal of it's job-delegation token.
+     * 
+     * @param fullPath path to be deleted
+     * @param conf job configuration
+     * @param ugi ugi of the job to be used to delete the path
+     * @param jobIdTokenRenewalToCancel jobId of the job whose job-delegation
+     *                                  token renewal should be cancelled. No
+     *                                  cancellation is attempted if this is
+     *                                  <code>null</code>
+     */
+    public PathDeletionContext(Path fullPath, Configuration conf,
+        UserGroupInformation ugi, JobID jobIdTokenRenewalToCancel) {
       this.fullPath = fullPath;
       this.conf = conf;
+      this.ugi = ugi;
+      this.jobIdTokenRenewalToCancel = jobIdTokenRenewalToCancel;
     }
     
     protected Path getPathForCleanup() {
@@ -72,13 +102,20 @@ class CleanupQueue {
      */
     protected void deletePath() throws IOException, InterruptedException {
       final Path p = getPathForCleanup();
-      UserGroupInformation.getLoginUser().doAs(
+      (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
           new PrivilegedExceptionAction<Object>() {
             public Object run() throws IOException {
              p.getFileSystem(conf).delete(p, true);
              return null;
             }
           });
+      
+      // Cancel renewal of job-delegation token if necessary
+      if (jobIdTokenRenewalToCancel != null && 
+          conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+        DelegationTokenRenewal.removeDelegationTokenRenewalForJob(
+            jobIdTokenRenewalToCancel);
+      }
     }
 
     @Override

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Jun 21 06:37:27 2013
@@ -329,6 +329,7 @@ public class ClusterStatus implements Wr
    * 
    * @return the size of heap memory used by the <code>JobTracker</code>
    */
+  @Deprecated
   public long getUsedMemory() {
     return used_memory;
   }
@@ -338,6 +339,7 @@ public class ClusterStatus implements Wr
    * 
    * @return the configured size of max heap memory that can be used by the <code>JobTracker</code>
    */
+  @Deprecated
   public long getMaxMemory() {
     return max_memory;
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java Fri Jun 21 06:37:27 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -59,18 +60,22 @@ public class Counters implements Writabl
   private static char[] charsToEscape =  {GROUP_OPEN, GROUP_CLOSE, 
                                           COUNTER_OPEN, COUNTER_CLOSE, 
                                           UNIT_OPEN, UNIT_CLOSE};
+  private static final JobConf conf = new JobConf();
   /** limit on the size of the name of the group **/
-  private static final int GROUP_NAME_LIMIT = 128;
+  private static final int GROUP_NAME_LIMIT = 
+    conf.getInt("mapreduce.job.counters.group.name.max", 128);
   /** limit on the size of the counter name **/
-  private static final int COUNTER_NAME_LIMIT = 64;
+  private static final int COUNTER_NAME_LIMIT = 
+    conf.getInt("mapreduce.job.counters.counter.name.max", 64);
   
-  private static final JobConf conf = new JobConf();
   /** limit on counters **/
   public static int MAX_COUNTER_LIMIT = 
-    conf.getInt("mapreduce.job.counters.limit", 120);
+    conf.getInt("mapreduce.job.counters.limit", // deprecated in 0.23
+        conf.getInt("mapreduce.job.counters.max", 120));
 
   /** the max groups allowed **/
-  static final int MAX_GROUP_LIMIT = 50;
+  public static final int MAX_GROUP_LIMIT = 
+    conf.getInt("mapreduce.job.counters.groups.max", 50);
   
   /** the number of current counters**/
   private int numCounters = 0;
@@ -163,7 +168,7 @@ public class Counters implements Writabl
     
     Group(String groupName) {
       try {
-        bundle = getResourceBundle(groupName);
+        bundle = CounterGroup.getResourceBundle(groupName);
       }
       catch (MissingResourceException neverMind) {
       }
@@ -291,7 +296,7 @@ public class Counters implements Writabl
      * @deprecated use {@link #getCounter(String)} instead
      */
     @Deprecated
-    public synchronized Counter getCounter(int id, String name) {
+    public Counter getCounter(int id, String name) {
       return getCounterForName(name);
     }
     
@@ -300,23 +305,27 @@ public class Counters implements Writabl
      * @param name the internal counter name
      * @return the counter
      */
-    public synchronized Counter getCounterForName(String name) {
-      String shortName = getShortName(name, COUNTER_NAME_LIMIT);
-      Counter result = subcounters.get(shortName);
-      if (result == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding " + shortName);
-        }
-        numCounters = (numCounters == 0) ? Counters.this.size(): numCounters; 
-        if (numCounters >= MAX_COUNTER_LIMIT) {
-          throw new CountersExceededException("Error: Exceeded limits on number of counters - " 
-              + "Counters=" + numCounters + " Limit=" + MAX_COUNTER_LIMIT);
+    public Counter getCounterForName(String name) {
+      synchronized(Counters.this) { // lock ordering: Counters then Group
+        synchronized (this) {
+          String shortName = getShortName(name, COUNTER_NAME_LIMIT);
+          Counter result = subcounters.get(shortName);
+          if (result == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding " + shortName);
+            }
+            numCounters = (numCounters == 0) ? Counters.this.size(): numCounters; 
+            if (numCounters >= MAX_COUNTER_LIMIT) {
+              throw new CountersExceededException("Error: Exceeded limits on number of counters - " 
+                  + "Counters=" + numCounters + " Limit=" + MAX_COUNTER_LIMIT);
+            }
+            result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
+            subcounters.put(shortName, result);
+            numCounters++;
+          }
+          return result;
         }
-        result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
-        subcounters.put(shortName, result);
-        numCounters++;
       }
-      return result;
     }
     
     /**
@@ -377,15 +386,6 @@ public class Counters implements Writabl
   private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
 
   /**
-   * Returns the specified resource bundle, or throws an exception.
-   * @throws MissingResourceException if the bundle isn't found
-   */
-  private static ResourceBundle getResourceBundle(String enumClassName) {
-    String bundleName = enumClassName.replace('$','_');
-    return ResourceBundle.getBundle(bundleName);
-  }
-
-  /**
    * Returns the names of all counter classes.
    * @return Set of counter names.
    */

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Jun 21 06:37:27 2013
@@ -185,8 +185,23 @@ public class DefaultTaskController exten
 
       String commandFile = writeCommand(cmdLine, rawFs, p).getAbsolutePath();
       rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
-      String[] commandArray = Shell.getRunCommand(commandFile, attemptId);
-      shExec = new ShellCommandExecutor(commandArray, currentWorkDirectory);
+      if (Shell.WINDOWS) {
+        String[] commandArray = Shell.getRunCommand(commandFile, attemptId);
+        shExec = new ShellCommandExecutor(commandArray, currentWorkDirectory);
+      } else {
+        /*
+         * MAPREDUCE-2374: if another thread fork(2)ed a child process during the
+         * window when writeCommand (above) had taskjvm.sh open for write, that
+         * child process might still have a writeable fd open to the script.
+         *
+         * If we run the script with "bash -c /path/to/taskjvm.sh", then bash
+         * would try to execve(2) the script and get ETXTBSY.  Instead, just have
+         * bash interpret the script with "bash /path/to/taskjvm.sh".
+         */
+        shExec = new ShellCommandExecutor(new String[]{
+            "bash", commandFile},
+            currentWorkDirectory);
+      }
       shExec.execute();
     } catch (Exception e) {
       if (shExec == null) {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for file-input-format counters
 
 CounterGroupName=                  File Input Format Counters 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for file-output-format counters
 
 CounterGroupName=                  File Output Format Counters 

Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+
+public class HDFSMonitorThread extends Thread {
+  
+  public static final Log LOG = LogFactory.getLog(HDFSMonitorThread.class);
+  
+  private final JobTracker jt;
+  private final FileSystem fs;
+  
+  private final int hdfsMonitorInterval;
+  
+  public HDFSMonitorThread(Configuration conf, JobTracker jt, FileSystem fs) {
+    super("JT-HDFS-Monitor-Thread");
+    this.jt = jt;
+    this.fs = fs;
+    this.hdfsMonitorInterval = 
+        conf.getInt(
+            JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL, 
+            JobTracker.DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS);
+    setDaemon(true);
+  }
+
+  @Override
+  public void run() {
+    
+    LOG.info("Starting HDFS Health Monitoring...");
+    
+    boolean previouslyHealthy = true;
+    boolean done = false;
+    
+    while (!done && !isInterrupted()) {
+      
+      boolean currentlyHealthy = DistributedFileSystem.isHealthy(fs.getUri());
+      if (currentlyHealthy != previouslyHealthy) {
+        
+        JobTracker.SafeModeAction action; 
+        if (currentlyHealthy) {
+          action = SafeModeAction.SAFEMODE_LEAVE;
+          LOG.info("HDFS healthy again, instructing JobTracker to leave " +
+              "'safemode' ...");
+        } else {
+          action = SafeModeAction.SAFEMODE_ENTER;
+          LOG.info("HDFS is unhealthy, instructing JobTracker to enter " +
+              "'safemode' ...");
+        }
+        
+        try {
+          if (jt.isInAdminSafeMode()) {
+            // Don't override admin-set safemode
+            LOG.info("JobTracker is in admin-set safemode, not overriding " +
+            		"through " + action);
+           previouslyHealthy = currentlyHealthy; 
+          } else {
+            previouslyHealthy = !(jt.setSafeModeInternal(action)); 
+                                                         //safemode => !healthy
+          }
+        } catch (IOException ioe) {
+          LOG.info("Failed to setSafeMode with action " + action, ioe);
+        }
+      }
+      
+      try {
+        Thread.sleep(hdfsMonitorInterval);
+      } catch (InterruptedException e) {
+        done = true;
+      }
+    }
+    
+    LOG.info("Stoping HDFS Health Monitoring...");
+  }
+  
+}

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java Fri Jun 21 06:37:27 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -291,7 +292,7 @@ class IFile {
                   CompressionCodec codec,
                   Counters.Counter readsCounter) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length);
+      checksumIn = new IFileInputStream(in,length, conf);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         this.in = codec.createInputStream(checksumIn, decompressor);
@@ -325,7 +326,8 @@ class IFile {
     private int readData(byte[] buf, int off, int len) throws IOException {
       int bytesRead = 0;
       while (bytesRead < len) {
-        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
         if (n < 0) {
           return bytesRead;
         }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Fri Jun 21 06:37:27 2013
@@ -19,11 +19,20 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.util.DataChecksum;
 /**
  * A checksum input stream, used for IFiles.
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks
 
 class IFileInputStream extends InputStream {
   
-  private final InputStream in; //The input stream to be verified for checksum. 
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
   private final long length; //The total length of the input file
   private final long dataLength;
   private DataChecksum sum;
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStre
   private final byte b[] = new byte[1];
   private byte csum[] = null;
   private int checksumSize;
-  
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
   /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
    */
-  public IFileInputStream(InputStream in, long len) {
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
     this.in = in;
+    this.inFd = getFileDescriptorIfAvail(in);
     sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     length = len;
     dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
+        DEFAULT_MAPRED_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
+        DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
   }
 
   /**
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStre
    */
   @Override
   public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
     if (currentOffset < dataLength) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStre
     if (currentOffset >= dataLength) {
       return -1;
     }
-    
+
+    doReadahead();
+
     return doRead(b,off,len);
   }
 
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
   /**
    * Read bytes from the stream.
    * At EOF, checksum is validated and sent back

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Jun 21 06:37:27 2013
@@ -72,8 +72,9 @@ interface InterTrackerProtocol extends V
    * Version 29: Adding available memory and CPU usage information on TT to
    *             TaskTrackerStatus for MAPREDUCE-1218
    * Version 30: Adding disk failure to TaskTrackerStatus for MAPREDUCE-3015
+   * Version 31: Adding version methods for HADOOP-8209
    */
-  public static final long versionID = 30L;
+  public static final long versionID = 31L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -144,9 +145,13 @@ interface InterTrackerProtocol extends V
    */
   public String getSystemDir();
   
-  
   /**
-   * Returns the buildVersion of the JobTracker 
+   * Returns the VersionInfo build version of the JobTracker 
    */
   public String getBuildVersion() throws IOException;
+
+  /**
+   * Returns the VersionInfo version of the JobTracker
+   */
+  public String getVIVersion() throws IOException;
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java Fri Jun 21 06:37:27 2013
@@ -101,36 +101,43 @@ class JSPUtil {
     final JobInProgress job = jt.getJob(jobid);
     JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
 
+    if (!jt.areACLsEnabled() || job == null) {
+      return myJob;
+    }
+    
     String user = request.getRemoteUser();
-    if (user != null && job != null && jt.areACLsEnabled()) {
-      final UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(user);
-      try {
-        ugi.doAs(new PrivilegedExceptionAction<Void>() {
-          public Void run() throws IOException, ServletException {
-
-            // checks job view permission
-            jt.getACLsManager().checkAccess(job, ugi,
-                Operation.VIEW_JOB_DETAILS);
-            return null;
-          }
-        });
-      } catch (AccessControlException e) {
-        String errMsg = "User " + ugi.getShortUserName() +
-            " failed to view " + jobid + "!<br><br>" + e.getMessage() +
-            "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
-        JSPUtil.setErrorAndForward(errMsg, request, response);
-        myJob.setViewAccess(false);
-      } catch (InterruptedException e) {
-        String errMsg = " Interrupted while trying to access " + jobid +
-        "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
-        JSPUtil.setErrorAndForward(errMsg, request, response);
-        myJob.setViewAccess(false);
-      }
+    if (user == null) {
+      JSPUtil.setErrorAndForward("Null user", request, response);
+      myJob.setViewAccess(false);
+      return myJob;
+    }
+    
+    final UserGroupInformation ugi = 
+      UserGroupInformation.createRemoteUser(user);
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws IOException, ServletException {
+          // checks job view permission
+          jt.getACLsManager().checkAccess(job, ugi,
+              Operation.VIEW_JOB_DETAILS);
+          return null;
+        }
+      });
+    } catch (AccessControlException e) {
+      String errMsg = "User " + ugi.getShortUserName() +
+          " failed to view " + jobid + "!<br><br>" + e.getMessage() +
+          "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+      JSPUtil.setErrorAndForward(errMsg, request, response);
+      myJob.setViewAccess(false);
+    } catch (InterruptedException e) {
+      String errMsg = " Interrupted while trying to access " + jobid +
+      "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+      JSPUtil.setErrorAndForward(errMsg, request, response);
+      myJob.setViewAccess(false);
     }
     return myJob;
   }
-
+  
   /**
    * Sets error code SC_UNAUTHORIZED in response and forwards to
    * error page which contains error message and a back link.

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Jun 21 06:37:27 2013
@@ -40,6 +40,7 @@ import java.security.PrivilegedException
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -426,9 +431,17 @@ public class JobClient extends Configure
       ensureFreshStatus();
       return status.getFailureInfo();
     }
+
+    @Override
+    public JobStatus getJobStatus() throws IOException {
+      updateStatus();
+      return status;
+    }
   }
 
+  private JobSubmissionProtocol rpcJobSubmitClient;
   private JobSubmissionProtocol jobSubmitClient;
+  
   private Path sysDir = null;
   private Path stagingAreaDir = null;
   
@@ -439,6 +452,15 @@ public class JobClient extends Configure
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
   static int tasklogtimeout;
 
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "mapreduce.jobclient.retry.policy.enabled";
+  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 
+      false;
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "mapreduce.jobclient.retry.policy.spec";
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "10000,6,60000,10"; //t1,n1,t2,n2,...
+  
   /**
    * Create a job client.
    */
@@ -471,16 +493,72 @@ public class JobClient extends Configure
       conf.setNumMapTasks(1);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.rpcJobSubmitClient = 
+          createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
     }        
   }
 
   private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, 
-        UserGroupInformation.getCurrentUser(), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    
+    JobSubmissionProtocol rpcJobSubmitClient = 
+        (JobSubmissionProtocol)RPC.getProxy(
+            JobSubmissionProtocol.class,
+            JobSubmissionProtocol.versionID, addr, 
+            UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), 
+            0,
+            RetryUtils.getMultipleLinearRandomRetry(
+                conf,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+                ),
+            false);
+    
+    return rpcJobSubmitClient;
+  }
+
+  private static JobSubmissionProtocol createProxy(
+      JobSubmissionProtocol rpcJobSubmitClient,
+      Configuration conf) throws IOException {
+
+    /*
+     * Default is to retry on JobTrackerNotYetInitializedException
+     * i.e. wait for JobTracker to get to RUNNING state and for
+     * SafeModeException
+     */
+    @SuppressWarnings("unchecked")
+    RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            JobTrackerNotYetInitializedException.class,
+            SafeModeException.class
+            ); 
+
+    /*
+     * Method specific retry policies for killJob and killTask...
+     *
+     * No retries on any exception including
+     * ConnectionException and SafeModeException
+     */
+    Map<String,RetryPolicy> methodNameToPolicyMap = 
+        new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    
+    final JobSubmissionProtocol jsp = (JobSubmissionProtocol) RetryProxy.create(
+        JobSubmissionProtocol.class,
+        rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
+    RPC.checkVersion(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, jsp);
+    return jsp;
   }
 
   @InterfaceAudience.Private
@@ -496,7 +574,7 @@ public class JobClient extends Configure
     public long renew(Token<?> token, Configuration conf
                       ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -505,7 +583,7 @@ public class JobClient extends Configure
     public void cancel(Token<?> token, Configuration conf
                        ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -531,15 +609,16 @@ public class JobClient extends Configure
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
     this.ugi = UserGroupInformation.getCurrentUser();
-    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+    rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); 
+    jobSubmitClient = createProxy(rpcJobSubmitClient, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    if (!(jobSubmitClient instanceof LocalJobRunner)) {
-      RPC.stopProxy(jobSubmitClient);
+    if (!(rpcJobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(rpcJobSubmitClient);
     }
   }
 
@@ -759,10 +838,9 @@ public class JobClient extends Configure
     
     // First we check whether the cached archives and files are legal.
     TrackerDistributedCacheManager.validate(job);
-    //  set the timestamps of the archives and files
-    TrackerDistributedCacheManager.determineTimestamps(job);
-    //  set the public/private visibility of the archives and files
-    TrackerDistributedCacheManager.determineCacheVisibilities(job);
+    //  set the timestamps of the archives and files and set the
+    //  public/private visibility of the archives and files
+    TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job);
     // get DelegationTokens for cache files
     TrackerDistributedCacheManager.getDelegationTokens(job, 
                                                        job.getCredentials());
@@ -774,12 +852,20 @@ public class JobClient extends Configure
       if ("".equals(job.getJobName())){
         job.setJobName(new Path(originalJarPath).getName());
       }
-      Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
-      job.setJar(submitJarFile.toString());
-      fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
-      fs.setReplication(submitJarFile, replication);
-      fs.setPermission(submitJarFile, 
-          new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+      Path originalJarFile = new Path(originalJarPath);
+      URI jobJarURI = originalJarFile.toUri();
+      // If the job jar is already in fs, we don't need to copy it from local fs
+      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
+              || !(jobJarURI.getScheme().equals(fs.getUri().getScheme())
+                  && jobJarURI.getAuthority().equals(
+                                            fs.getUri().getAuthority()))) {
+        Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
+        job.setJar(submitJarFile.toString());
+        fs.copyFromLocalFile(originalJarFile, submitJarFile);
+        fs.setReplication(submitJarFile, replication);
+        fs.setPermission(submitJarFile, 
+            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+      }
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "+
                "See JobConf(Class) or JobConf#setJar(String).");
@@ -909,6 +995,12 @@ public class JobClient extends Configure
             FileSystem.create(fs, submitJobFile,
                 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
 
+          // removing jobtoken referrals before copying the jobconf to HDFS
+          // as the tasks don't need this setting, actually they may break
+          // because of it if present as the referral will point to a
+          // different job.
+          TokenCache.cleanUpTokenReferral(jobCopy);
+
           try {
             jobCopy.writeXml(out);
           } finally {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Jun 21 06:37:27 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.lib.Hash
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -319,6 +320,26 @@ public class JobConf extends Configurati
   public static final String MAPRED_REDUCE_TASK_ENV =
     "mapred.reduce.child.env";
 
+  public static final String WORKFLOW_ID = "mapreduce.workflow.id";
+
+  public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
+
+  public static final String WORKFLOW_NODE_NAME =
+      "mapreduce.workflow.node.name";
+
+  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
+      "mapreduce.workflow.adjacency.";
+
+  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
+      "^mapreduce\\.workflow\\.adjacency\\..+";
+
+  public static final String WORKFLOW_TAGS = "mapreduce.workflow.tags";
+
+  public static final String MAPREDUCE_RECOVER_JOB = 
+      "mapreduce.job.restart.recover";
+
+  public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 
+
   private Credentials credentials = new Credentials();
   
   /**
@@ -429,7 +450,7 @@ public class JobConf extends Configurati
    * @param cls the example class.
    */
   public void setJarByClass(Class cls) {
-    String jar = findContainingJar(cls);
+    String jar = ClassUtil.findContainingJar(cls);
     if (jar != null) {
       setJar(jar);
     }   
@@ -1797,38 +1818,6 @@ public class JobConf extends Configurati
     return 
     (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
   }
-  
-  /** 
-   * Find a jar that contains a class of the same name, if any.
-   * It will return a jar file, even if that is not the first thing
-   * on the class path that has a class with the same name.
-   * 
-   * @param my_class the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findContainingJar(Class my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for(Enumeration itr = loader.getResources(class_file);
-          itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
-  }
-
 
   /**
    * Get the memory required to run a task of this job, in bytes. See

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Jun 21 06:37:27 2013
@@ -119,8 +119,6 @@ public class JobHistory {
   
   public static final int JOB_NAME_TRIM_LENGTH = 50;
   private static String JOBTRACKER_UNIQUE_STRING = null;
-  private static final String JOBHISTORY_DEBUG_MODE =
-    "mapreduce.jobhistory.debug.mode";
   private static String LOG_DIR = null;
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
@@ -131,17 +129,15 @@ public class JobHistory {
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0744); // rwxr--r--
   private static FileSystem LOGDIR_FS; // log dir filesystem
-  private static FileSystem DONEDIR_FS; // Done dir filesystem
+  protected static FileSystem DONEDIR_FS; // Done dir filesystem
   private static JobConf jtConf;
-  private static Path DONE = null; // folder for completed jobs
+  protected static Path DONE = null; // folder for completed jobs
   private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
   private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
   private static boolean aclsEnabled = false;
 
   static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
 
-  // XXXXX debug mode -- set this to false for production
-  private static boolean DEBUG_MODE;
   private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
   private static int SERIAL_NUMBER_LOW_DIGITS;
 
@@ -225,8 +221,11 @@ public class JobHistory {
 
 
     void start() {
-      executor = new ThreadPoolExecutor(1, 3, 1, 
+      executor = new ThreadPoolExecutor(5, 5, 1, 
           TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+      // make core threads to terminate if there has been no work
+      // for the keppalive period.
+      executor.allowCoreThreadTimeOut(true);
     }
 
     private FilesHolder getFileHolder(JobID id) {
@@ -368,8 +367,8 @@ public class JobHistory {
            timestamp.get(Calendar.YEAR),
            // months are 0-based in Calendar, but people will expect January
            // to be month #1.
-           timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH) + 1,
-           timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+            timestamp.get(Calendar.MONTH) + 1,
+            timestamp.get(Calendar.DAY_OF_MONTH));
 
         dateString = dateString.intern();
 
@@ -391,9 +390,9 @@ public class JobHistory {
 
     synchronized (existingDoneSubdirs) {
       if (existingDoneSubdirs.contains(dir)) {
-        if (DEBUG_MODE && !DONEDIR_FS.exists(dir)) {
-          System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
-                             + dir + " already existed, but it didn't.");
+        if (LOG.isDebugEnabled() && !DONEDIR_FS.exists(dir)) {
+          LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
+              + " already existed, but it didn't.");
         }
           
         return true;
@@ -411,9 +410,9 @@ public class JobHistory {
 
         return false;
       } else {
-        if (DEBUG_MODE) {
-          System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
-                             + dir + " didn't already exist, but it did.");
+        if (LOG.isDebugEnabled()) {
+          LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
+              + " didn't already exist, but it did.");
         }
 
         return false;
@@ -476,7 +475,9 @@ public class JobHistory {
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
     TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
-    VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON
+    VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON, LOCALITY, AVATAAR,
+    WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES,
+    WORKFLOW_TAGS
   }
 
   /**
@@ -497,8 +498,7 @@ public class JobHistory {
   public static void init(JobTracker jobTracker, JobConf conf,
              String hostname, long jobTrackerStartTime) throws IOException {
     initLogDir(conf);
-    DEBUG_MODE = conf.getBoolean(JOBHISTORY_DEBUG_MODE, false);
-    SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3;
+    SERIAL_NUMBER_LOW_DIGITS = 3;
     SERIAL_NUMBER_FORMAT = ("%0"
        + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
        + "d");
@@ -547,8 +547,9 @@ public class JobHistory {
     String doneLocation = conf.
                      get("mapred.job.tracker.history.completed.location");
     if (doneLocation != null) {
-      DONE = fs.makeQualified(new Path(doneLocation));
-      DONEDIR_FS = fs;
+      Path donePath = new Path(doneLocation);
+      DONEDIR_FS = donePath.getFileSystem(conf);
+      DONE = DONEDIR_FS.makeQualified(donePath);
     } else {
       if (!setup) {
         initLogDir(conf);
@@ -586,6 +587,19 @@ public class JobHistory {
     }
 
     fileManager.start();
+    
+    HistoryCleaner.cleanupFrequency =
+    	      conf.getLong("mapreduce.jobhistory.cleaner.interval-ms",
+    	      HistoryCleaner.DEFAULT_CLEANUP_FREQUENCY);
+    HistoryCleaner.maxAgeOfHistoryFiles =
+    	      conf.getLong("mapreduce.jobhistory.max-age-ms",
+    	      HistoryCleaner.DEFAULT_HISTORY_MAX_AGE);
+    LOG.info(String.format("Job History MaxAge is %d ms (%.2f days), " +
+    	      "Cleanup Frequency is %d ms (%.2f days)",
+    	      HistoryCleaner.maxAgeOfHistoryFiles,
+    	      ((float) HistoryCleaner.maxAgeOfHistoryFiles)/HistoryCleaner.ONE_DAY_IN_MS,
+    	      HistoryCleaner.cleanupFrequency,
+    	      ((float) HistoryCleaner.cleanupFrequency)/HistoryCleaner.ONE_DAY_IN_MS));
   }
 
   /**
@@ -789,6 +803,10 @@ public class JobHistory {
                   Keys[] keys, String[] values) {
     log(writers, recordType, keys, values, null);
   }
+
+  static class JobHistoryLogger {
+    static final Log LOG = LogFactory.getLog(JobHistoryLogger.class);
+  }
   
   /**
    * Log a number of keys and values with record. the array length of keys and values
@@ -822,15 +840,19 @@ public class JobHistory {
       builder.append(DELIMITER); 
     }
     builder.append(LINE_DELIMITER_CHAR);
-    
+
+    String logLine = builder.toString();
     for (Iterator<PrintWriter> iter = writers.iterator(); iter.hasNext();) {
       PrintWriter out = iter.next();
-      out.println(builder.toString());
+      out.println(logLine);
       if (out.checkError() && id != null) {
         LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager");
         iter.remove();
       }
     }
+    if (recordType != RecordTypes.Meta) {
+      JobHistoryLogger.LOG.debug(logLine);
+    }
   }
   
   /**
@@ -1269,6 +1291,34 @@ public class JobHistory {
     }
     
     /**
+     * Get the workflow adjacencies from the job conf
+     * The string returned is of the form "key"="value" "key"="value" ...
+     */
+    public static String getWorkflowAdjacencies(Configuration conf) {
+      int prefixLen = JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
+      Map<String,String> adjacencies = 
+          conf.getValByRegex(JobConf.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
+      if (adjacencies.isEmpty())
+        return "";
+      int size = 0;
+      for (Entry<String,String> entry : adjacencies.entrySet()) {
+        int keyLen = entry.getKey().length();
+        size += keyLen - prefixLen;
+        size += entry.getValue().length() + 6;
+      }
+      StringBuilder sb = new StringBuilder(size);
+      for (Entry<String,String> entry : adjacencies.entrySet()) {
+        int keyLen = entry.getKey().length();
+        sb.append("\"");
+        sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
+        sb.append("\"=\"");
+        sb.append(escapeString(entry.getValue()));
+        sb.append("\" ");
+      }
+      return sb.toString();
+    }
+    
+    /**
      * Get the job history file path given the history filename
      */
     public static Path getJobHistoryLogLocation(String logFileName)
@@ -1390,17 +1440,16 @@ public class JobHistory {
       FileStatus[] statuses = null;
 
       if (dir == DONE) {
-        final String snDirectoryComponent
-          = serialNumberDirectoryComponent(id);
-
         final String scanTail
           = (DONE_BEFORE_SERIAL_TAIL
              + "/" + serialNumberDirectoryComponent(id));
 
-        if (DEBUG_MODE) {
-          System.err.println("JobHistory.getJobHistoryFileName DONE dir: scanning " + scanTail);
-
-          (new IOException("debug exception")).printStackTrace(System.err);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("JobHistory.getJobHistoryFileName DONE dir: scanning "
+              + scanTail);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(Thread.currentThread().getStackTrace());
+          }
         }
 
         statuses = localGlobber(fs, DONE, scanTail, filter);
@@ -1410,9 +1459,6 @@ public class JobHistory {
       
       String filename = null;
       if (statuses == null || statuses.length == 0) {
-        if (DEBUG_MODE) {
-          System.err.println("Nothing to recover for job " + id);
-        }
         LOG.info("Nothing to recover for job " + id);
       } else {
         // return filename considering that fact the name can be a 
@@ -1735,11 +1781,21 @@ public class JobHistory {
                        new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
                                   Keys.SUBMIT_TIME, Keys.JOBCONF,
                                   Keys.VIEW_JOB, Keys.MODIFY_JOB,
-                                  Keys.JOB_QUEUE}, 
+                                  Keys.JOB_QUEUE, Keys.WORKFLOW_ID,
+                                  Keys.WORKFLOW_NAME, Keys.WORKFLOW_NODE_NAME,
+                                  Keys.WORKFLOW_ADJACENCIES,
+                                  Keys.WORKFLOW_TAGS}, 
                        new String[]{jobId.toString(), jobName, user, 
                                     String.valueOf(submitTime) , jobConfPath,
                                     viewJobACL, modifyJobACL,
-                                    jobConf.getQueueName()}, jobId
+                                    jobConf.getQueueName(),
+                                    jobConf.get(JobConf.WORKFLOW_ID, ""),
+                                    jobConf.get(JobConf.WORKFLOW_NAME, ""),
+                                    jobConf.get(JobConf.WORKFLOW_NODE_NAME, ""),
+                                    getWorkflowAdjacencies(jobConf),
+                                    jobConf.get(JobConf.WORKFLOW_TAGS, ""),
+                                    }, 
+                                    jobId
                       ); 
            
       }catch(IOException e){
@@ -2140,7 +2196,14 @@ public class JobHistory {
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
       logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
     }
-    
+
+    @Deprecated
+    public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
+        String trackerName, int httpPort, String taskType) {
+      logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 
+          Locality.OFF_SWITCH, Avataar.VIRGIN);
+    }
+
     /**
      * Log start time of this map task attempt.
      *  
@@ -2148,25 +2211,31 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param trackerName name of the tracker executing the task attempt.
      * @param httpPort http port of the task tracker executing the task attempt
-     * @param taskType Whether the attempt is cleanup or setup or map 
+     * @param taskType Whether the attempt is cleanup or setup or map
+     * @param locality the data locality of the task attempt
+     * @param Avataar the avataar of the task attempt
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
-                                  String taskType) {
+                                  String taskType,
+                                  Locality locality, Avataar avataar) {
       JobID id = taskAttemptId.getJobID();
       ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
       if (null != writer){
         JobHistory.log(writer, RecordTypes.MapAttempt, 
                        new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
-                                   Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
-                                   Keys.TRACKER_NAME, Keys.HTTP_PORT},
+                                   Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+                                   Keys.TRACKER_NAME, Keys.HTTP_PORT,
+                                   Keys.LOCALITY, Keys.AVATAAR},
                        new String[]{taskType,
                                     taskAttemptId.getTaskID().toString(), 
                                     taskAttemptId.toString(), 
                                     String.valueOf(startTime), trackerName,
-                                    httpPort == -1 ? "" : 
-                                      String.valueOf(httpPort)}, id); 
+                                    httpPort == -1 ? "" : String.valueOf(httpPort),
+                                    locality.toString(), avataar.toString()},
+                       id
+                       ); 
       }
     }
     
@@ -2328,7 +2397,15 @@ public class JobHistory {
                                   long startTime, String hostName){
       logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
     }
-    
+
+    @Deprecated
+    public static void logStarted(TaskAttemptID taskAttemptId, 
+        long startTime, String trackerName, 
+        int httpPort, 
+        String taskType) {
+      logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 
+          Locality.OFF_SWITCH, Avataar.VIRGIN);
+    }
     /**
      * Log start time of  Reduce task attempt. 
      * 
@@ -2336,27 +2413,33 @@ public class JobHistory {
      * @param startTime start time
      * @param trackerName tracker name 
      * @param httpPort the http port of the tracker executing the task attempt
-     * @param taskType Whether the attempt is cleanup or setup or reduce 
+     * @param taskType Whether the attempt is cleanup or setup or reduce
+     * @param locality the data locality of the task attempt
+     * @param Avataar the avataar of the task attempt
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String trackerName, 
                                   int httpPort, 
-                                  String taskType) {
-              JobID id = taskAttemptId.getJobID();
-	      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
-
-              if (null != writer){
-		  JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-				 new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-					      Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
-					      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-				 new String[]{taskType,
-					      taskAttemptId.getTaskID().toString(), 
-					      taskAttemptId.toString(), 
-					      String.valueOf(startTime), trackerName,
-					      httpPort == -1 ? "" : 
-						String.valueOf(httpPort)}, id); 
-              }
+                                  String taskType,
+                                  Locality locality, Avataar avataar) {
+      JobID id = taskAttemptId.getJobID();
+	    ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.ReduceAttempt,
+            new Keys[] {Keys.TASK_TYPE, Keys.TASKID,
+                Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+                Keys.TRACKER_NAME, Keys.HTTP_PORT,
+                Keys.LOCALITY, Keys.AVATAAR},
+            new String[]{taskType,
+                taskAttemptId.getTaskID().toString(), 
+                taskAttemptId.toString(), 
+                String.valueOf(startTime), trackerName,
+                httpPort == -1 ? "" : 
+                String.valueOf(httpPort),
+                locality.toString(), avataar.toString()}, 
+            id); 
+        }
 	    }
 	    
 	    /**
@@ -2521,26 +2604,26 @@ public class JobHistory {
      */
     public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException; 
   }
-
-  static long directoryTime(String year, String seg2, String seg3) {
-    // set to current time.  In debug mode, this is where the month
-    // and day get set.
+  
+  /**
+   * Returns the time in milliseconds, truncated to the day.
+   */
+  static long directoryTime(String year, String month, String day) {
     Calendar result = Calendar.getInstance();
-    // canonicalize by filling in unset fields
-    result.setTimeInMillis(System.currentTimeMillis());
+    result.clear();
 
     result.set(Calendar.YEAR, Integer.parseInt(year));
 
     // months are 0-based in Calendar, but people will expect January
     // to be month #1 .  Therefore the number is bumped before we make the 
     // directory name and must be debumped to seek the time.
-    result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH,
-               Integer.parseInt(seg2) - 1);
+    result.set(Calendar.MONTH, Integer.parseInt(month) - 1);
 
-    result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH,
-               Integer.parseInt(seg3));
-
-    return result.getTimeInMillis();
+    result.set(Calendar.DAY_OF_MONTH, Integer.parseInt(day));
+    
+    // truncate to day granularity
+    long timeInMillis = result.getTimeInMillis();
+    return timeInMillis;
   }
   
   /**
@@ -2551,10 +2634,10 @@ public class JobHistory {
    */
   public static class HistoryCleaner implements Runnable {
     static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
-    static final long DIRECTORY_LIFE_IN_MS
-      = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS;
-    static final long RUN_INTERVAL
-      = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS;
+    static final long DEFAULT_HISTORY_MAX_AGE = 30 * ONE_DAY_IN_MS;
+    static final long DEFAULT_CLEANUP_FREQUENCY = ONE_DAY_IN_MS;
+    static long cleanupFrequency = DEFAULT_CLEANUP_FREQUENCY;
+    static long maxAgeOfHistoryFiles = DEFAULT_HISTORY_MAX_AGE;
     private long now; 
     private static final AtomicBoolean isRunning = new AtomicBoolean(false); 
     private static long lastRan = 0; 
@@ -2571,12 +2654,15 @@ public class JobHistory {
       }
       now = System.currentTimeMillis();
       // clean history only once a day at max
-      if (lastRan != 0 && (now - lastRan) < RUN_INTERVAL) {
+      if (lastRan != 0 && (now - lastRan) < cleanupFrequency) {
         isRunning.set(false);
         return; 
       }
       lastRan = now;
-
+      clean(now);
+    }
+    
+    public void clean(long now) {
       Set<String> deletedPathnames = new HashSet<String>();
 
       // XXXXX debug code
@@ -2587,62 +2673,73 @@ public class JobHistory {
         Path[] datedDirectories
           = FileUtil.stat2Paths(localGlobber(DONEDIR_FS, DONE,
                                              DONE_BEFORE_SERIAL_TAIL, null));
-        // find directories older than 30 days
+
+        // any file with a timestamp earlier than cutoff should be deleted
+        long cutoff = now - maxAgeOfHistoryFiles;
+        Calendar cutoffDay = Calendar.getInstance();
+        cutoffDay.setTimeInMillis(cutoff);
+        cutoffDay.set(Calendar.HOUR_OF_DAY, 0);
+        cutoffDay.set(Calendar.MINUTE, 0);
+        cutoffDay.set(Calendar.SECOND, 0);
+        cutoffDay.set(Calendar.MILLISECOND, 0);
+        
+        // find directories older than the maximum age
         for (int i = 0; i < datedDirectories.length; ++i) {
           String thisDir = datedDirectories[i].toString();
           Matcher pathMatcher = parseDirectory.matcher(thisDir);
 
           if (pathMatcher.matches()) {
-            long dirTime = directoryTime(pathMatcher.group(1),
+            long dirDay = directoryTime(pathMatcher.group(1),
                                          pathMatcher.group(2),
                                          pathMatcher.group(3));
 
-            if (DEBUG_MODE) {
-              System.err.println("HistoryCleaner.run just parsed " + thisDir
-                                 + " as year/month/day = " + pathMatcher.group(1)
-                                 + "/" + pathMatcher.group(2) + "/"
-                                 + pathMatcher.group(3));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("HistoryCleaner.run just parsed " + thisDir
+                  + " as year/month/day = " + pathMatcher.group(1) + "/"
+                  + pathMatcher.group(2) + "/" + pathMatcher.group(3));
             }
-
-            if (dirTime < now - DIRECTORY_LIFE_IN_MS) {
-
-              if (DEBUG_MODE) {
-                Calendar then = Calendar.getInstance();
-                then.setTimeInMillis(dirTime);
+            
+            if (dirDay <= cutoffDay.getTimeInMillis()) {
+              if (LOG.isDebugEnabled()) {
                 Calendar nnow = Calendar.getInstance();
                 nnow.setTimeInMillis(now);
+                Calendar then = Calendar.getInstance();
+                then.setTimeInMillis(dirDay);
                 
-                System.err.println("HistoryCleaner.run directory: " + thisDir
-                                   + " because its time is " + then
-                                   + " but it's now " + nnow);
-                System.err.println("then = " + dirTime);
-                System.err.println("now  = " + now);
+                LOG.debug("HistoryCleaner.run directory: " + thisDir
+                    + " because its time is " + then + " but it's now " + nnow);
               }
+            }
+            
+            // if dirDay is cutoffDay, some files may be old enough and others not
+            if (dirDay == cutoffDay.getTimeInMillis()) {
+              // remove old enough files in the directory
+              FileStatus[] possibleDeletees = DONEDIR_FS.listStatus(datedDirectories[i]);
+              
+              for (int j = 0; j < possibleDeletees.length; ++j) {
+            	  if (possibleDeletees[j].getModificationTime() < now - 
+            	      maxAgeOfHistoryFiles) {
+            	    Path deletee = possibleDeletees[j].getPath();
+                  if (LOG.isDebugEnabled() && !printedOneDeletee) {
+                    LOG.debug("HistoryCleaner.run deletee: "
+                        + deletee.toString());
+                    printedOneDeletee = true;
+                  }
 
-              // remove every file in the directory and save the name
-              // so we can remove it from jobHistoryFileMap
-              Path[] deletees
-                = FileUtil.stat2Paths(localGlobber(DONEDIR_FS,
-                                                   datedDirectories[i],
-                                                   "/*/*", // sn + individual files
-                                                   null));
-
-              for (int j = 0; j < deletees.length; ++j) {
-
-                if (DEBUG_MODE && !printedOneDeletee) {
-                  System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString());
-                  printedOneDeletee = true;
-                }
-
-                DONEDIR_FS.delete(deletees[j]);
-                deletedPathnames.add(deletees[j].toString());
+                  DONEDIR_FS.delete(deletee);
+                  deletedPathnames.add(deletee.toString());
+            	  }
               }
+            }
+
+            // if the directory is older than cutoffDay, we can flat out
+            // delete it because all the files in it are old enough
+            if (dirDay < cutoffDay.getTimeInMillis()) {
               synchronized (existingDoneSubdirs) {
-                if (!existingDoneSubdirs.contains(datedDirectories[i]))
-                  {
-                    LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
-                             + datedDirectories[i] + ", but should.");
-                  }
+                if (!existingDoneSubdirs.contains(datedDirectories[i])) {
+                  LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
+                      + datedDirectories[i] + ", but should.");
+                }
                 DONEDIR_FS.delete(datedDirectories[i], true);
                 existingDoneSubdirs.remove(datedDirectories[i]);
               }
@@ -2657,8 +2754,8 @@ public class JobHistory {
           while (it.hasNext()) {
             MovedFileInfo info = it.next().getValue();
 
-            if (DEBUG_MODE && !printedOneMovedFile) {
-              System.err.println("HistoryCleaner.run a moved file: " + info.historyFile);
+            if (LOG.isDebugEnabled() && !printedOneMovedFile) {
+              LOG.debug("HistoryCleaner.run a moved file: " + info.historyFile);
               printedOneMovedFile = true;
             }