You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/08/27 17:33:40 UTC

svn commit: r1377714 - in /hadoop/common/branches/branch-1: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapr...

Author: acmurthy
Date: Mon Aug 27 15:33:40 2012
New Revision: 1377714

URL: http://svn.apache.org/viewvc?rev=1377714&view=rev
Log:
MAPREDUCE-4328. Add a JobTracker safemode to allow it to be resilient to NameNode failures. The safemode can be entered either automatically via the configurable background thread to monitor the NameNode or by the admin. In the safemode the JobTracker doesn't schedule new tasks, marks all failed tasks as KILLED for future retries and doesn't accept new job submissions.   

Added:
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/common/branches/branch-1/src/webapps/job/jobtracker.jsp

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug 27 15:33:40 2012
@@ -33,6 +33,13 @@ Release 1.2.0 - unreleased
     HADOOP-7754. Expose file descriptors from Hadoop-wrapped local 
     FileSystems (todd and ahmed via tucu)
 
+    MAPREDUCE-4328. Add a JobTracker safemode to allow it to be resilient to
+    NameNode failures. The safemode can be entered either automatically via
+    the configurable background thread to monitor the NameNode or by the
+    admin. In the safemode the JobTracker doesn't schedule new tasks, marks
+    all failed tasks as KILLED for future retries and doesn't accept new job
+    submissions. (acmurthy)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Mon Aug 27 15:33:40 2012
@@ -1042,10 +1042,20 @@ class CapacityTaskScheduler extends Task
      */ 
     updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
     
-    // schedule tasks
+    /*
+     * Schedule tasks
+     */
+    
     List<Task> result = new ArrayList<Task>();
-    addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
-    addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+    
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+    } else {
+      addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
+      addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+    }
+    
     return result;
   }
 

Modified: hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon Aug 27 15:33:40 2012
@@ -688,6 +688,12 @@ public class TestCapacityScheduler exten
     public QueueManager getQueueManager() {
       return qm;
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   // represents a fake queue configuration info

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Mon Aug 27 15:33:40 2012
@@ -412,7 +412,13 @@ public class FairScheduler extends TaskS
 
     // Update time waited for local maps for jobs skipped on last heartbeat
     updateLocalityWaitTimes(currentTime);
-    
+
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+      return null;
+    } 
+
     TaskTrackerStatus tts = tracker.getStatus();
 
     int mapsAssigned = 0; // loop counter for map in the below while loop

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Aug 27 15:33:40 2012
@@ -509,6 +509,12 @@ public class TestFairScheduler extends T
       trackerForTip.get(attemptIdStr).getTaskReports().remove(status);
       return true;
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   protected JobConf conf;

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java Mon Aug 27 15:33:40 2012
@@ -50,4 +50,13 @@ public interface AdminOperationsProtocol
    * Refresh the node list at the {@link JobTracker} 
    */
   void refreshNodes() throws IOException;
+  
+  /**
+   * Set safe mode for the JobTracker.
+   * @param safeModeAction safe mode action
+   * @return current safemode
+   * @throws IOException
+   */
+  boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException;
 }

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java Mon Aug 27 15:33:40 2012
@@ -34,6 +34,9 @@ class AuditLogger {
     static final String REFRESH_QUEUE = "REFRESH_QUEUE";
     static final String REFRESH_NODES = "REFRESH_NODES";
     
+    static final String GET_SAFEMODE = "GET_SAFEMODE";
+    static final String SET_SAFEMODE = "SET_SAFEMODE";
+    
     // Some commonly used descriptions
     static final String UNAUTHORIZED_USER = "Unauthorized user";
   }

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java?rev=1377714&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java Mon Aug 27 15:33:40 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+
+public class HDFSMonitorThread extends Thread {
+  
+  public static final Log LOG = LogFactory.getLog(HDFSMonitorThread.class);
+  
+  private final JobTracker jt;
+  private final FileSystem fs;
+  
+  private final int hdfsMonitorInterval;
+  
+  public HDFSMonitorThread(Configuration conf, JobTracker jt, FileSystem fs) {
+    super("JT-HDFS-Monitor-Thread");
+    this.jt = jt;
+    this.fs = fs;
+    this.hdfsMonitorInterval = 
+        conf.getInt(
+            JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL, 
+            JobTracker.DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS);
+    setDaemon(true);
+  }
+
+  @Override
+  public void run() {
+    
+    LOG.info("Starting HDFS Health Monitoring...");
+    
+    boolean previouslyHealthy = true;
+    boolean done = false;
+    
+    while (!done && !isInterrupted()) {
+      
+      boolean currentlyHealthy = DistributedFileSystem.isHealthy(fs.getUri());
+      if (currentlyHealthy != previouslyHealthy) {
+        
+        JobTracker.SafeModeAction action; 
+        if (currentlyHealthy) {
+          action = SafeModeAction.SAFEMODE_LEAVE;
+          LOG.info("HDFS healthy again, instructing JobTracker to leave " +
+              "'safemode' ...");
+        } else {
+          action = SafeModeAction.SAFEMODE_ENTER;
+          LOG.info("HDFS is unhealthy, instructing JobTracker to enter " +
+              "'safemode' ...");
+        }
+        
+        try {
+          if (jt.isInAdminSafeMode()) {
+            // Don't override admin-set safemode
+            LOG.info("JobTracker is in admin-set safemode, not overriding " +
+            		"through " + action);
+           previouslyHealthy = currentlyHealthy; 
+          } else {
+            previouslyHealthy = !(jt.setSafeModeInternal(action)); 
+                                                         //safemode => !healthy
+          }
+        } catch (IOException ioe) {
+          LOG.info("Failed to setSafeMode with action " + action, ioe);
+        }
+      }
+      
+      try {
+        Thread.sleep(hdfsMonitorInterval);
+      } catch (InterruptedException e) {
+        done = true;
+      }
+    }
+    
+    LOG.info("Stoping HDFS Health Monitoring...");
+  }
+  
+}

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Mon Aug 27 15:33:40 2012
@@ -80,6 +80,12 @@ class JobQueueTaskScheduler extends Task
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+      return null;
+    } 
+
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Aug 27 15:33:40 2012
@@ -18,13 +18,9 @@
 package org.apache.hadoop.mapred;
 
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.InputStreamReader;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.net.BindException;
@@ -53,6 +49,7 @@ import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,27 +58,33 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
-import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -89,6 +92,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
@@ -96,6 +100,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -104,16 +109,6 @@ import org.apache.hadoop.util.HostsFileR
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.security.Credentials;
 import org.mortbay.util.ajax.JSON;
 
 /*******************************************************
@@ -203,7 +198,7 @@ public class JobTracker implements MRCon
   
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
-  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  private static final int FS_ACCESS_RETRY_PERIOD = 1000;
   static final String JOB_INFO_FILE = "job-info";
   static final String JOB_TOKEN_FILE = "jobToken";
   private DNSToSwitchMapping dnsToSwitchMapping;
@@ -276,6 +271,16 @@ public class JobTracker implements MRCon
     return clock;
   }
     
+  static final String JT_HDFS_MONITOR_ENABLE = 
+      "mapreduce.jt.hdfs.monitor.enable";
+  static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false;
+  
+  static final String JT_HDFS_MONITOR_THREAD_INTERVAL = 
+      "mapreduce.jt.hdfs.monitor.interval.ms";
+  static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000;
+  
+  private Thread hdfsMonitor;
+
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -1872,8 +1877,179 @@ public class JobTracker implements MRCon
     this(conf, identifier, clock, new QueueManager(new Configuration(conf)));
   } 
   
+  private void initJTConf(JobConf conf) {
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) {
+      LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY + 
+          " is enabled, disabling it");
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);
+    }
+  }
+  
+  private void initializeFilesystem() throws IOException, InterruptedException {
+    // Connect to HDFS NameNode
+    while (!Thread.currentThread().isInterrupted() && fs == null) {
+      try {
+        fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
+          public FileSystem run() throws IOException {
+            return FileSystem.get(conf);
+          }});
+      } catch (IOException ie) {
+        fs = null;
+        LOG.info("Problem connecting to HDFS Namenode... re-trying", ie);
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Ensure HDFS is healthy
+    if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) {
+      while (!DistributedFileSystem.isHealthy(fs.getUri())) {
+        LOG.info("HDFS initialized but not 'healthy' yet, waiting...");
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+  }
+  
+  private void initialize() 
+      throws IOException, InterruptedException {
+    // initialize history parameters.
+    final JobTracker jtFinal = this;
+    
+    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        JobHistory.init(jtFinal, conf, jtFinal.localMachine, 
+            jtFinal.startTime);
+        return true;
+      }
+    });
+
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        // if we haven't contacted the namenode go ahead and do it
+        // clean up the system dir, which will only work if hdfs is out of 
+        // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.getOwner().equals(
+              getMROwner().getShortUserName())) {
+            throw new AccessControlException("The systemdir " + systemDir +
+                " is not owned by " + getMROwner().getShortUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir +
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
+        // Make sure that the backup data is preserved
+        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
+        // Check if the history is enabled .. as we cant have persistence with 
+        // history disabled
+        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
+            && systemDirData != null) {
+          for (FileStatus status : systemDirData) {
+            try {
+              recoveryManager.checkAndAddJob(status);
+            } catch (Throwable t) {
+              LOG.warn("Failed to add the job " + status.getPath().getName(), 
+                       t);
+            }
+          }
+          
+          // Check if there are jobs to be recovered
+          hasRestarted = recoveryManager.shouldRecover();
+          if (hasRestarted) {
+            break; // if there is something to recover else clean the sys dir
+          }
+        }
+        LOG.info("Cleaning up the system directory");
+        fs.delete(systemDir, true);
+        if (FileSystem.mkdirs(fs, systemDir, 
+            new FsPermission(SYSTEM_DIR_PERMISSION))) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+                 + ") because of permissions.");
+        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+                 + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ", ace);
+        throw ace;
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Same with 'localDir' except it's always on the local disk.
+    if (!hasRestarted) {
+      conf.deleteLocalFiles(SUBDIR);
+    }
+
+    // Initialize history DONE folder
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        JobHistory.initDone(conf, fs);
+        final String historyLogDir = 
+          JobHistory.getCompletedJobHistoryLocation().toString();
+        infoServer.setAttribute("historyLogDir", historyLogDir);
+
+        infoServer.setAttribute
+          ("serialNumberDirectoryDigits",
+           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
+
+        infoServer.setAttribute
+          ("serialNumberTotalDigits",
+           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
+        
+        return new Path(historyLogDir).getFileSystem(conf);
+      }
+    });
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.setAttribute("jobConf", conf);
+    infoServer.setAttribute("aclManager", aclsManager);
+
+    if (JobHistoryServer.isEmbedded(conf)) {
+      LOG.info("History server being initialized in embedded mode");
+      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
+      jobHistoryServer.start();
+      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
+    }
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+    
+    // Setup HDFS monitoring
+    if (this.conf.getBoolean(
+        JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) {
+      hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs);
+      hdfsMonitor.start();
+    }
+
+  }
+  
   JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) 
-  throws IOException, InterruptedException { 
+  throws IOException, InterruptedException {
+    
+    initJTConf(conf);
+    
     this.queueManager = qm;
     this.clock = clock;
     // Set ports, start RPC servers, setup security policy etc.
@@ -1979,7 +2155,12 @@ public class JobTracker implements MRCon
     // Set service-level authorization security policy
     if (conf.getBoolean(
           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+      PolicyProvider policyProvider = 
+          (PolicyProvider)(ReflectionUtils.newInstance(
+              conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                  MapReducePolicyProvider.class, PolicyProvider.class), 
+              conf));
+        ServiceAuthorizationManager.refresh(conf, policyProvider);
     }
     
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
@@ -2006,16 +2187,6 @@ public class JobTracker implements MRCon
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());
     infoServer.setAttribute("job.tracker", this);
-    // initialize history parameters.
-    final JobTracker jtFinal = this;
-    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
-      @Override
-      public Boolean run() throws Exception {
-        JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
-            jtFinal.startTime);
-        return true;
-      }
-    });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2035,125 +2206,12 @@ public class JobTracker implements MRCon
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     
-    // start the recovery manager
-    recoveryManager = new RecoveryManager();
-    
-    while (!Thread.currentThread().isInterrupted()) {
-      try {
-        // if we haven't contacted the namenode go ahead and do it
-        if (fs == null) {
-          fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
-            public FileSystem run() throws IOException {
-              return FileSystem.get(conf);
-          }});
-        }
-        // clean up the system dir, which will only work if hdfs is out of 
-        // safe mode
-        if(systemDir == null) {
-          systemDir = new Path(getSystemDir());    
-        }
-        try {
-          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.getOwner().equals(
-              getMROwner().getShortUserName())) {
-            throw new AccessControlException("The systemdir " + systemDir +
-                " is not owned by " + getMROwner().getShortUserName());
-          }
-          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
-            LOG.warn("Incorrect permissions on " + systemDir +
-                ". Setting it to " + SYSTEM_DIR_PERMISSION);
-            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
-          }
-        } catch (FileNotFoundException fnf) {} //ignore
-        // Make sure that the backup data is preserved
-        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
-        // Check if the history is enabled .. as we cant have persistence with 
-        // history disabled
-        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && systemDirData != null) {
-          for (FileStatus status : systemDirData) {
-            try {
-              recoveryManager.checkAndAddJob(status);
-            } catch (Throwable t) {
-              LOG.warn("Failed to add the job " + status.getPath().getName(), 
-                       t);
-            }
-          }
-          
-          // Check if there are jobs to be recovered
-          hasRestarted = recoveryManager.shouldRecover();
-          if (hasRestarted) {
-            break; // if there is something to recover else clean the sys dir
-          }
-        }
-        LOG.info("Cleaning up the system directory");
-        fs.delete(systemDir, true);
-        if (FileSystem.mkdirs(fs, systemDir, 
-            new FsPermission(SYSTEM_DIR_PERMISSION))) {
-          break;
-        }
-        LOG.error("Mkdirs failed to create " + systemDir);
-      } catch (AccessControlException ace) {
-        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
-                 + ") because of permissions.");
-        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
-                 + ") and then start the JobTracker.");
-        LOG.warn("Bailing out ... ", ace);
-        throw ace;
-      } catch (IOException ie) {
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
-      }
-      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
-    }
-    
-    if (Thread.currentThread().isInterrupted()) {
-      throw new InterruptedException();
-    }
-    
-    // Same with 'localDir' except it's always on the local disk.
-    if (!hasRestarted) {
-      jobConf.deleteLocalFiles(SUBDIR);
-    }
-
-    // Initialize history DONE folder
-    FileSystem historyFS = getMROwner().doAs(
-        new PrivilegedExceptionAction<FileSystem>() {
-      public FileSystem run() throws IOException {
-        JobHistory.initDone(conf, fs);
-        final String historyLogDir = 
-          JobHistory.getCompletedJobHistoryLocation().toString();
-        infoServer.setAttribute("historyLogDir", historyLogDir);
-
-        infoServer.setAttribute
-          ("serialNumberDirectoryDigits",
-           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
-
-        infoServer.setAttribute
-          ("serialNumberTotalDigits",
-           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
-        
-        return new Path(historyLogDir).getFileSystem(conf);
-      }
-    });
-    infoServer.setAttribute("fileSys", historyFS);
-    infoServer.setAttribute("jobConf", conf);
-    infoServer.setAttribute("aclManager", aclsManager);
-
-    if (JobHistoryServer.isEmbedded(conf)) {
-      LOG.info("History server being initialized in embedded mode");
-      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
-      jobHistoryServer.start();
-      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
-    }
-
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2244,6 +2302,17 @@ public class JobTracker implements MRCon
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    // start the inter-tracker server 
+    this.interTrackerServer.start();
+    
+    // Initialize the JobTracker FileSystem within safemode
+    setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+    initializeFilesystem();
+    setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+    
+    // Initialize JobTracker
+    initialize();
+    
     // Prepare for recovery. This is done irrespective of the status of restart
     // flag.
     while (true) {
@@ -2283,12 +2352,10 @@ public class JobTracker implements MRCon
       completedJobsStoreThread.start();
     }
 
-    // start the inter-tracker server once the jt is ready
-    this.interTrackerServer.start();
-    
     synchronized (this) {
       state = State.RUNNING;
     }
+    
     LOG.info("Starting RUNNING");
     
     this.interTrackerServer.join();
@@ -3488,6 +3555,12 @@ public class JobTracker implements MRCon
   // returns cleanup tasks first, then setup tasks.
   synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
+    
+    // Don't assign *any* new task in safemode
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     int maxMapTasks = taskTracker.getMaxMapSlots();
     int maxReduceTasks = taskTracker.getMaxReduceSlots();
     int numMaps = taskTracker.countOccupiedMapSlots();
@@ -3622,6 +3695,10 @@ public class JobTracker implements MRCon
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
+    if (isInSafeMode()) {
+      throw new IOException("JobTracker in safemode");
+    }
+    
     JobInfo jobInfo = null;
     if (ugi == null) {
       ugi = UserGroupInformation.getCurrentUser();
@@ -4315,6 +4392,11 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    // Might not be initialized yet, TT handles this
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
@@ -5083,4 +5165,81 @@ public class JobTracker implements MRCon
     return map;
   }
   // End MXbean implementaiton
+
+  /**
+   * JobTracker SafeMode
+   */
+  // SafeMode actions
+  public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+  
+  private AtomicBoolean safeMode = new AtomicBoolean(false);
+  private AtomicBoolean adminSafeMode = new AtomicBoolean(false);
+  private String adminSafeModeUser = "";
+  
+  public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    // Anyone can check JT safe-mode
+    if (safeModeAction == SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = this.safeMode.get();
+      LOG.info("Getting safemode information: safemode=" + safeMode + ". " +
+          "Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      AuditLogger.logSuccess(user, Constants.GET_SAFEMODE, 
+          Constants.JOBTRACKER);
+      return safeMode;
+    }
+    
+    // Check access for modifications to safe-mode
+    if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) {
+      AuditLogger.logFailure(user, Constants.SET_SAFEMODE, 
+          aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER, 
+          Constants.UNAUTHORIZED_USER);
+      throw new AccessControlException(user + 
+                                       " is not authorized to set " +
+                                       " JobTracker safemode.");
+    }
+    AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER);
+
+    boolean currSafeMode = setSafeModeInternal(safeModeAction);
+    adminSafeMode.set(currSafeMode);
+    adminSafeModeUser = user;
+    return currSafeMode;
+  }
+  
+  boolean isInAdminSafeMode() {
+    return adminSafeMode.get();
+  }
+  
+  boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    if (safeModeAction != SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = false;
+      if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) {
+        safeMode = true;
+      } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) {
+        safeMode = false;
+      }
+      LOG.info("Setting safe mode to " + safeMode + ". Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      this.safeMode.set(safeMode);
+    }
+    return this.safeMode.get();
+  }
+
+  public boolean isInSafeMode() {
+    return safeMode.get();
+  }
+  
+  String getSafeModeText() {
+    if (!isInSafeMode())
+      return "OFF";
+    String safeModeInfo = 
+        adminSafeMode.get() ? 
+            "Set by admin <strong>" + adminSafeModeUser + "</strong>": 
+            "HDFS unavailable";
+    return "<em>ON - " + safeModeInfo + "</em>";
+  }
+  
 }

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Mon Aug 27 15:33:40 2012
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.net.Node;
@@ -604,10 +605,18 @@ class TaskInProgress {
           
       changed = oldState != newState;
     }
+    
     // if task is a cleanup attempt, do not replace the complete status,
     // update only specific fields.
     // For example, startTime should not be updated, 
     // but finishTime has to be updated.
+    
+    // Don't fail tasks when JobTracker is in safe-mode
+    if (status.getRunState() == State.FAILED && jobtracker.isInSafeMode()) {
+      LOG.info("JT is in safe-mode; marking " + taskid + " as KILLED");
+      status.setRunState(State.KILLED);
+    }
+
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
     } else {

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 27 15:33:40 2012
@@ -1704,8 +1704,17 @@ public class TaskTracker implements MRCo
           }
           
           String dir = jobClient.getSystemDir();
-          if (dir == null) {
-            throw new IOException("Failed to get system directory");
+          while (dir == null) {
+            LOG.info("Failed to get system directory...");
+            
+            // Re-try
+            try {
+              // Sleep interval: 1000 ms - 5000 ms
+              int sleepInterval = 1000 + r.nextInt(4000);
+              Thread.sleep(sleepInterval);
+            } catch (InterruptedException ie) 
+            {}
+            dir = jobClient.getSystemDir();
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Mon Aug 27 15:33:40 2012
@@ -112,4 +112,10 @@ interface TaskTrackerManager {
    * @param job JobInProgress object
    */
   public void failJob(JobInProgress job);
+  
+  /**
+   * Get safe mode.
+   * @return
+   */
+  public boolean isInSafeMode();
 }

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java Mon Aug 27 15:33:40 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.AdminOperationsProtocol;
@@ -57,7 +56,9 @@ public class MRAdmin extends Configured 
     "The full syntax is: \n\n" +
     "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
     "[-refreshNodes] [-refreshUserToGroupsMappings] " +
-    "[-refreshSuperUserGroupsConfiguration] [-help [cmd]]\n";
+    "[-refreshSuperUserGroupsConfiguration] " +
+    "[-safemode <enter | leave | wait | get> " +
+    "[-help [cmd]]\n";
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
@@ -74,6 +75,14 @@ public class MRAdmin extends Configured 
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
   
+  String safemode = "-safemode <enter|leave|get|wait>:  Safe mode maintenance command.\n" + 
+      "\t\tSafe mode is a JobTracker state in which it\n" +
+      "\t\t\t1.  does not accept new job submissions\n" +
+      "\t\t\t2.  does not schedule any new tasks\n" +
+      "\t\t\t3.  does not fail any tasks due to any error\n" +
+      "\t\tSafe mode can be entered manually, but then\n" +
+      "\t\tit can only be turned off manually as well.\n";
+      
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
     "\t\tis specified.\n";
 
@@ -87,6 +96,8 @@ public class MRAdmin extends Configured 
     System.out.println(refreshSuperUserGroupsConfiguration);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
+  }  else if ("safemode".equals(cmd)) {
+    System.out.println(safemode);
   } else if ("help".equals(cmd)) {
     System.out.println(help);
   } else {
@@ -125,7 +136,8 @@ public class MRAdmin extends Configured 
       System.err.println("           [-refreshQueues]");
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
-      System.err.println("           [-refreshNodes]");
+      System.err.println("           [-refreshNodes]");      
+      System.err.println("           [-safemode <enter | leave | get | wait>]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -208,6 +220,58 @@ public class MRAdmin extends Configured 
     return 0;
   }
 
+  private int setSafeMode(String actionString) throws IOException {
+    JobTracker.SafeModeAction action;
+    Boolean waitExitSafe = false;
+
+    if ("leave".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_LEAVE;
+    } else if ("enter".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_ENTER;
+    } else if ("get".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+    } else if ("wait".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+      waitExitSafe = true;
+    } else {
+      printUsage("-safemode");
+      return -1;
+    }
+
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+
+    boolean inSafeMode = adminOperationsProtocol.setSafeMode(action);
+
+    //
+    // If we are waiting for safemode to exit, then poll and
+    // sleep till we are out of safemode.
+    //
+    if (waitExitSafe) {
+      while (inSafeMode) {
+        try {
+          Thread.sleep(3000);
+        } catch (java.lang.InterruptedException e) {
+          throw new IOException("Wait Interrupted");
+        }
+        inSafeMode = adminOperationsProtocol.setSafeMode(action);
+      }
+    }
+
+    System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF"));
+
+    return 0;
+  }
   
   /**
    * refreshSuperUserGroupsConfiguration {@link JobTracker}.
@@ -297,6 +361,12 @@ public class MRAdmin extends Configured 
         return exitCode;
       }
     }
+    if ("-safemode".equals(cmd)) {
+      if (args.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    }
     
     exitCode = 0;
     try {
@@ -310,6 +380,8 @@ public class MRAdmin extends Configured 
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
+      } else if ("-safemode".equals(cmd)) {
+        exitCode = setSafeMode(args[i++]);
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
           printUsage(args[i]);

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Mon Aug 27 15:33:40 2012
@@ -238,6 +238,12 @@ public class TestJobQueueTaskScheduler e
       status.setRunState(TaskStatus.State.RUNNING);
       trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
     
   }
   

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java?rev=1377714&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java Mon Aug 27 15:33:40 2012
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** 
+ * A test for JobTracker safemode. In safemode, no tasks are scheduled, and
+ * no tasks are marked as failed (they are killed instead).
+ */
+public class TestJobTrackerQuiescence {
+  final Path testDir = 
+    new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  
+  final int maxMapTasks = 1;
+  
+  private MiniDFSCluster dfs;
+  private MiniMRCluster mr;
+  private FileSystem fileSys;
+  private JobTracker jt;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(TestJobTrackerQuiescence.class);
+  
+  @Before
+  public void setUp() throws IOException {
+    
+    Configuration conf = new Configuration();
+    conf.setBoolean("dfs.replication.considerLoad", false);
+    dfs = new MiniDFSCluster(conf, 1, true, null, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    
+    // clean up
+    fileSys.delete(testDir, true);
+    
+    if (!fileSys.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+
+    // Write the input file
+    UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                            new Path(inDir + "/file"), (short)1);
+
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+    + (dfs.getFileSystem()).getUri().getPort();
+    
+    JobConf jtConf = new JobConf();
+    jtConf.setInt("mapred.tasktracker.map.tasks.maximum", maxMapTasks);
+    jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+    jtConf.setBoolean(JobTracker.JT_HDFS_MONITOR_ENABLE, true);
+    jtConf.setInt(JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL, 1000);
+    mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
+    mr.waitUntilIdle();
+    mr.setInlineCleanupThreads();
+    jt = mr.getJobTrackerRunner().getJobTracker();
+  }
+  
+  @After
+  public void tearDown() {
+    if (mr != null) {
+      try {
+        mr.shutdown();
+      } catch (Exception e) {}
+    }
+    if (dfs != null) {
+      try {
+        dfs.shutdown();
+      } catch (Exception e) {}
+    }
+  }
+  
+  @Test
+  public void testHDFSMonitor() throws Exception {
+    /*
+     * Try 'automatic' safe-mode 
+     */
+    // Put HDFS in safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    int numTries = 20;
+    while (!jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+      
+    // Remove HDFS from safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should not be in safe-mode
+    assertEquals(false, jt.isInSafeMode());
+      
+    /*
+     * Now ensure 'automatic' mode doesn't interfere with 'admin set' safe-mode
+     */
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    numTries = 20;
+    while (!jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+
+    // Now, put JT in admin set safe-mode
+    enterSafeMode();
+    
+    // Bring HDFS back from safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // But now JT should *still* be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+    assertEquals(true, jt.isInAdminSafeMode());
+    
+    // Leave JT safe-mode
+    leaveSafeMode();
+    assertEquals(false, jt.isInAdminSafeMode());
+    
+    // Bounce HDFS back in-out
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    Thread.sleep(5000);
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should not be in safe-mode
+    assertEquals(false, jt.isInSafeMode());
+      
+  }
+  
+  @Test
+  public void testMRAdminSafeModeWait() throws Exception {
+    
+    enterSafeMode();
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<Void> future = executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        MRAdmin mrAdmin = new MRAdmin(mr.createJobConf());
+        mrAdmin.run(new String[] { "-safemode", "wait" });
+        return null;
+      }
+    });
+    try {
+      future.get(1, TimeUnit.SECONDS);
+      fail("JT should still be in safemode");
+    } catch (TimeoutException e) {
+      // expected
+    }
+
+    leaveSafeMode();
+
+    try {
+      future.get(10, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      fail("JT should no longer be in safemode");
+    }
+  }
+  
+  @Test
+  public void testJobsPauseInSafeMode() throws Exception {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    int numMaps = 10;
+    int numReds = 1;
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    jobConf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
+    // Configure the job
+    JobConf job = configureJob(jobConf, numMaps, numReds, 
+                               mapSignalFile, redSignalFile);
+      
+    fileSys.delete(shareDir, true);
+    
+    // Submit the job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(10);
+    }
+    assertEquals(numMaps / 2, getCompletedMapCount(rJob));
+
+    enterSafeMode();
+
+    // Signal all the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
+
+    // only assigned maps complete in safemode since no more maps may be
+    // assigned
+    Thread.sleep(10000);
+    assertEquals(numMaps / 2 + maxMapTasks, getCompletedMapCount(rJob));
+
+    leaveSafeMode();
+    
+    // job completes after leaving safemode
+    UtilsForTests.waitTillDone(jobClient);
+
+    assertTrue(rJob.isSuccessful());
+  }
+  
+  private int getCompletedMapCount(RunningJob rJob) throws IOException {
+    TaskCompletionEvent[] taskCompletionEvents = rJob.getTaskCompletionEvents(0);
+    int mapCount = 0;
+    for (TaskCompletionEvent tce : taskCompletionEvents) {
+      if (tce.isMap) {
+        mapCount++;
+      }
+    }
+    return mapCount;
+  }
+
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
+      String mapSignal, String redSignal) throws IOException {
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, maps,
+        reduces, "test-jt-safemode", mapSignal, redSignal);
+    return conf;
+  }
+  
+  private void enterSafeMode() throws IOException {
+    jt.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+  }
+
+  private void leaveSafeMode() throws IOException {
+    jt.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+  }
+}

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Mon Aug 27 15:33:40 2012
@@ -183,6 +183,12 @@ public class TestParallelInitialization 
         listener.jobAdded(job);
       }
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   protected JobConf jobConf;

Modified: hadoop/common/branches/branch-1/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/webapps/job/jobtracker.jsp?rev=1377714&r1=1377713&r2=1377714&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-1/src/webapps/job/jobtracker.jsp Mon Aug 27 15:33:40 2012
@@ -107,7 +107,7 @@
 <b>Compiled:</b> <%= VersionInfo.getDate()%> by 
                  <%= VersionInfo.getUser()%><br>
 <b>Identifier:</b> <%= tracker.getTrackerIdentifier()%><br>                 
-                   
+<b>SafeMode:</b> <%= tracker.getSafeModeText()%><br>                    
 <hr>
 <h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(Runtime.getRuntime().totalMemory()) %>/<%= StringUtils.byteDesc(Runtime.getRuntime().maxMemory()) %>)</h2>
 <%