You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [38/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Jun 21 06:37:27 2013
@@ -34,8 +34,7 @@ import org.junit.*;
  * restart doesnt schedule any new tasks and waits for the (old) trackers to 
  * join back.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
+
 @Ignore
 public class TestJobTrackerSafeMode extends TestCase {
   final Path testDir = 
@@ -153,6 +152,7 @@ public class TestJobTrackerSafeMode exte
     mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
     mr.stopTaskTracker(trackerToKill);
 
+    LOG.info("Starting the jobtracker...");
     // Restart the jobtracker
     mr.startJobTracker();
 
@@ -169,8 +169,6 @@ public class TestJobTrackerSafeMode exte
     LOG.info("Start a new tracker");
     mr.startTaskTracker(null, null, ++numTracker, numDir);
 
-    // Check if the jobs are still running
-    
     // Wait for the tracker to be lost
     boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
     while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
@@ -181,20 +179,55 @@ public class TestJobTrackerSafeMode exte
       // snapshot jobtracker's scheduling status
       shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
     }
-
-    assertTrue("JobTracker hasnt opened up scheduling even all the" 
-               + " trackers were recovered", 
-               jobtracker.recoveryManager.shouldSchedule());
-    
-    assertEquals("Recovery manager is in inconsistent state", 
-                 0, jobtracker.recoveryManager.recoveredTrackers.size());
     
+    assertTrue("JobTracker has not opened up scheduling after all the"
+        + " trackers were recovered", shouldSchedule);
+
+    assertEquals("Recovery manager is in inconsistent state", 0,
+        jobtracker.recoveryManager.recoveredTrackers.size());
+
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+
+    // Signal the reducers to complete
+    UtilsForTests
+        .signalTasks(dfs, fileSys, false, mapSignalFile, redSignalFile);
     // wait for the job to be complete
     UtilsForTests.waitTillDone(jobClient);
   }
 
   private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
                                 Set<String> absent) {
+    while (jobtracker.getClusterStatus(true).getActiveTrackerNames().size() != 3) {
+      LOG.info("Waiting for Initialize all Task Trackers");
+      UtilsForTests.waitFor(1000);
+    }
+    // Checking if the task tracker been initiated again
+    boolean found = false;
+    String strNewTrackerName = (String) (present.toArray()[0]);
+    LOG.info("Number of Trackers: "
+        + jobtracker.getClusterStatus(true).getActiveTrackerNames().size());
+    for (String trackername : jobtracker.getClusterStatus(true)
+        .getActiveTrackerNames()) {
+      if (trackername.equalsIgnoreCase((String) (present.toArray()[0]))) {
+        found = true;
+      } else {
+        String[] trackerhostnames = trackername.split(":");
+        CharSequence cseq = new String(trackerhostnames[0]);
+        if (((String) (present.toArray()[0])).contains(cseq)) {
+          strNewTrackerName = trackername;
+          found = false;
+          break;
+        }
+      }
+    }
+    if (!found) {
+      present.remove(((String) (present.toArray()[0])));
+      LOG.info("Old tracker on this machine got reinited, "
+          + "Tracker added with new port " + strNewTrackerName);
+      present.add(strNewTrackerName);
+    }
+    
     long jobtrackerRecoveryFinishTime = 
       jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
     for (String trackerName : present) {

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJvmReuse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJvmReuse.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJvmReuse.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJvmReuse.java Fri Jun 21 06:37:27 2013
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,9 +30,9 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Ignore;
+import org.junit.Test;
 
-@Ignore
-public class TestJvmReuse extends TestCase {
+public class TestJvmReuse {
   private static Path rootDir = new Path(System.getProperty("test.build.data",
       "/tmp"), TestJvmReuse.class.getName());
   private int numMappers = 5;
@@ -141,6 +142,8 @@ public class TestJvmReuse extends TestCa
     }
   }
 
+  @Ignore
+  @Test
   public void testTaskLogs() throws IOException {
     MiniMRCluster mr = null;
     try {

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Jun 21 06:37:27 2013
@@ -359,6 +359,7 @@ public class TestKillSubProcesses extend
     try {
       JobConf conf = new JobConf();
       conf.setLong(JvmManager.JvmManagerForType.DELAY_BEFORE_KILL_KEY, 0L);
+      conf.setFloat(JobTracker.JT_HEARTBEATS_SCALING_FACTOR, 500);
       mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
 
       // run the TCs

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Jun 21 06:37:27 2013
@@ -101,6 +101,25 @@ public class TestLostTracker extends Tes
       testTaskStatuses(mtip.getTaskStatuses());
     }
     
+    // Before validating job history, wait for the history file to be available
+    JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+    long beginWaiting = System.currentTimeMillis();
+    final long MAX_WAIT_TIME = 5 * 60 * 1000;
+    while (System.currentTimeMillis() - beginWaiting < MAX_WAIT_TIME) {
+      if (!jip.getHistoryFile().equals("")) {
+        break;
+      } else {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException ie) {
+          // do nothing
+        }
+      }
+    }
+
+    assertFalse("Job history file needs to be set for further validation", jip
+        .getHistoryFile().equals(""));
+
     // validate the history file
     TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
     TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Fri Jun 21 06:37:27 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Date;
+import java.text.SimpleDateFormat;
 import junit.framework.TestCase;
 import org.apache.hadoop.hdfs.TestHDFSServerPorts;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -62,7 +64,8 @@ public class TestMRServerPorts extends T
     conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
     JobTracker jt = null;
     try {
-      jt = JobTracker.startTracker(conf);
+      String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
+      jt = JobTracker.startTracker(conf, uniqid, true);
       runner.setJobTracker(jt);
       runner.start();
       conf.set("mapred.job.tracker", "localhost:" + jt.getTrackerPort());
@@ -90,7 +93,8 @@ public class TestMRServerPorts extends T
   throws IOException, InterruptedException {
     JobTracker jt = null;
     try {
-      jt = JobTracker.startTracker(conf);
+      String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
+      jt = JobTracker.startTracker(conf, uniqid, true);
     } catch(IOException e) {
       if (e instanceof java.net.BindException)
         return false;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java Fri Jun 21 06:37:27 2013
@@ -58,7 +58,8 @@ public class TestMapredHeartbeat extends
       
       // test configured heartbeat interval is capped with min value
       taskTrackers = 5;
-      conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 10);
+      conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 
+          (int)Math.ceil((taskTrackers * 1000.0) / MRConstants.HEARTBEAT_INTERVAL_MIN) );
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMerger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMerger.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMerger.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMerger.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestMerger {
+
+  @Test
+  public void testUncompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getUncompressedSegments());
+  }
+  
+  @Test
+  public void testCompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getCompressedSegments());
+  }
+  
+  @SuppressWarnings( { "deprecation", "unchecked" })
+  public void testMergeShouldReturnProperProgress(
+      List<Segment<Text, Text>> segments) throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path tmpDir = new Path("localpath");
+    Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
+    Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
+    RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
+    Counter readsCounter = new Counter();
+    Counter writesCounter = new Counter();
+    RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
+        valueClass, segments, 2, tmpDir, comparator, getReporter(),
+        readsCounter, writesCounter);
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
+  }
+
+  private Progressable getReporter() {
+    Progressable reporter = new Progressable() {
+      @Override
+      public void progress() {
+      }
+    };
+    return reporter;
+  }
+
+  private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 10; i++) {
+      segments.add(getUncompressedSegment(i));
+    }
+    return segments;
+  }
+
+  private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 10; i++) {
+      segments.add(getCompressedSegment(i));
+    }
+    return segments;
+  }
+  
+  private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
+    return new Segment<Text, Text>(getReader(i), false);
+  }
+  
+  private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
+    return new Segment<Text, Text>(getReader(i), false, 3000l);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Reader<Text, Text> getReader(int i) throws IOException {
+    Reader<Text, Text> readerMock = mock(Reader.class);
+    when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
+        20l);
+    when(
+        readerMock.next(any(DataInputBuffer.class), any(DataInputBuffer.class)))
+        .thenAnswer(getAnswer("Segment" + i));
+    return readerMock;
+  }
+
+  private Answer<?> getAnswer(final String segmentName) {
+    return new Answer<Object>() {
+      int i = 0;
+
+      public Boolean answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
+        DataInputBuffer value = (DataInputBuffer) args[1];
+        if (i++ == 2) {
+          return false;
+        }
+        key.reset(("Segement Key " + segmentName + i).getBytes(), 20);
+        value.reset(("Segement Value" + segmentName + i).getBytes(), 20);
+        return true;
+      }
+    };
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMiniMRClientCluster.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMiniMRClientCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestMiniMRClientCluster.java Fri Jun 21 06:37:27 2013
@@ -99,6 +99,29 @@ public class TestMiniMRClientCluster {
     validateCounters(job.getCounters(), 5, 25, 5, 5);
   }
 
+  @Test
+  public void testRestart() throws Exception {
+
+    String jobTrackerAddr1 = mrCluster.getConfig().get("mapred.job.tracker");
+    String taskTrackerAddr1 = mrCluster.getConfig().get(
+        "mapred.task.tracker.report.address");
+
+    mrCluster.restart();
+
+    String jobTrackerAddr2 = mrCluster.getConfig().get("mapred.job.tracker");
+    String taskTrackerAddr2 = mrCluster.getConfig().get(
+        "mapred.task.tracker.report.address");
+
+    assertEquals("Address before restart: " + jobTrackerAddr1
+        + " is different from new address: " + jobTrackerAddr2,
+        jobTrackerAddr1, jobTrackerAddr2);
+
+    assertEquals("Address before restart: " + taskTrackerAddr1
+        + " is different from new address: " + taskTrackerAddr2,
+        taskTrackerAddr1, taskTrackerAddr2);
+
+  }
+
   private void validateCounters(Counters counters, long mapInputRecords,
       long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
     assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java Fri Jun 21 06:37:27 2013
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
 
 public class TestNetworkedJob {
 
@@ -50,7 +51,6 @@ public class TestNetworkedJob {
     JobProfile mockProf = mock(JobProfile.class);
     new JobClient.NetworkedJob(mockStatus, mockProf, null);
   }
-  
 
   @SuppressWarnings("deprecation")
   @Test
@@ -96,4 +96,19 @@ public class TestNetworkedJob {
     verify(mockClient).getJobCounters(id);
   }
 
+  @Test
+  public void testGetJobStatus() throws IOException {
+    JobID id = new JobID("test", 0);
+
+    JobStatus mockStatus = mock(JobStatus.class);
+    JobProfile mockProf = mock(JobProfile.class);
+    JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+
+    when(mockProf.getJobID()).thenReturn(id);
+    when(mockClient.getJobStatus(id)).thenReturn(mockStatus);
+
+    RunningJob rj = new JobClient.NetworkedJob(mockStatus, mockProf, mockClient);
+    assertEquals("Expected getJobStatus() to return the correct status",
+        rj.getJobStatus(), mockStatus);
+  }
 }

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.BeforeClass;
+
+public class TestNodeGroupAwareTaskPlacement extends TestCase {
+
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String nodeGroup1[] = new String[] {
+    "/nodegroup1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.nodegroup1.rack1"
+  };
+  private static final String rack2[] = new String[] {
+    "/r1", "/r2"
+  };
+  private static final String nodeGroup2[] = new String[] {
+    "/nodegroup2", "/nodegroup3"
+  };
+  private static final String hosts2[] = new String[] {
+    "host2.nodegroup2.rack1", "host2.nodegroup3.rack2"
+  };
+  private static final String hosts3[] = new String[] {
+    "host2.nodegroup3.rack2"
+  };
+  private static final String nodeGroup3[] = new String[] {
+    "/nodegroup3"
+  };
+  private static final String rack3[] = new String[] {
+    "/r2"
+  };
+  private static final String hosts4[] = new String[] {
+    "host3.nodegroup1.rack1"
+  };
+  private static final String nodeGroup4[] = new String[] {
+    "/nodegroup1"
+  };
+  private static final String rack4[] = new String[] {
+    "/r1"
+  };
+  final Path inDir = new Path("/nodegrouptesting");
+  final Path outputPath = new Path("/output");
+
+  /**
+   * Launches a MR job and tests the job counters against the expected values.
+   * @param testName The name for the job
+   * @param mr The MR cluster
+   * @param fileSys The FileSystem
+   * @param in Input path
+   * @param out Output path
+   * @param numMaps Number of maps
+   * @param otherLocalMaps Expected value of other local maps
+   * @param rackLocalMaps Expected value of rack local maps
+   * @param nodeGroupLocalMaps Expected value of nodeGroup local maps
+   * @param dataLocalMaps Expected value of data(node) local maps
+   * @param jobConfig Configuration for running job
+   */
+  static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
+                                       FileSystem fileSys, Path in, Path out,
+                                       int numMaps, int otherLocalMaps,
+                                       int rackLocalMaps, int nodeGroupLocalMaps,
+                                       int dataLocalMaps, JobConf jobConfig)
+  throws IOException {
+    JobConf jobConf = mr.createJobConf(jobConfig);
+    if (fileSys.exists(out)) {
+        fileSys.delete(out, true);
+    }
+    RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+    Counters counters = job.getCounters();
+    assertEquals("Number of local maps",
+            counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+    assertEquals("Number of Data-local maps",
+            counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS),
+                                dataLocalMaps);
+    assertEquals("Number of NodeGroup-local maps",
+            counters.getCounter(JobInProgress.Counter.NODEGROUP_LOCAL_MAPS),
+                                nodeGroupLocalMaps);
+    assertEquals("Number of Rack-local maps",
+            counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS),
+                                rackLocalMaps);
+
+    mr.waitUntilIdle();
+    mr.shutdown();
+  }
+
+  @BeforeClass
+  public void setUp(){
+    // map host to related locations
+    StaticMapping.addNodeToRack(hosts1[0], rack1[0]+nodeGroup1[0]);
+    StaticMapping.addNodeToRack(hosts2[0], rack2[0]+nodeGroup2[0]);
+    StaticMapping.addNodeToRack(hosts2[1], rack2[1]+nodeGroup2[1]);
+    StaticMapping.addNodeToRack(hosts4[0], rack4[0]+nodeGroup4[0]);
+  }
+
+  public void testTaskPlacement() throws IOException {
+    String namenode = null;
+    MiniDFSClusterWithNodeGroup dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    String testName = "TestForNodeGroupAwareness";
+    try {
+      final int taskTrackers = 1;
+
+      /* Start 4 datanodes, two in rack r1/nodegroup1, one in r1/nodegroup2 and
+       * the other one in r2/nodegroup3. Create three
+       * files (splits).
+       * 1) file1, just after starting the datanode on r1/nodegroup1, with
+       *    a repl factor of 1, and,
+       * 2) file2 & file3 after starting the two datanodes in r1/nodegroup2 and
+       *    r2/nodegroup3, with a repl factor of 3.
+       * 3) start the last data node (datanode4) in r1/nodegroup1
+       * At the end, file1 will be present on only datanode1, and, file2 and
+       * file3, will be present on all datanodes except datanode4.
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+
+      conf.set("dfs.block.replicator.classname",
+          "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+
+      conf.set("net.topology.impl",
+          "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+
+      conf.setBoolean("net.topology.nodegroup.aware", true);
+
+      conf.setBoolean("mapred.jobtracker.nodegroup.aware", true);
+      conf.setInt("mapred.task.cache.levels", 3);
+
+      conf.set("mapred.jobtracker.jobSchedulable",
+          "org.apache.hadoop.mapred.JobSchedulableWithNodeGroup");
+
+      JobConf jobConf = new JobConf(conf);
+
+      MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroup1);
+      // start the dfs cluster with datanode1 only.
+      dfs = new MiniDFSClusterWithNodeGroup(0, conf, 1,
+                true, true, null, rack1, hosts1, null);
+
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      // write file1 on datanode1 with 1 replica
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
+      // start another two datanodes (2 and 3)
+      dfs.startDataNodes(conf, 2, true, null, rack2, nodeGroup2, hosts2, null);
+
+      dfs.waitActive();
+      // write two files with 3 replica, so each datanodes will have one replica
+      // of file2 and file3
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+      /* Run a job with the (only)tasktracker which is under r2/nodegroup3 and
+       * check the task placement that how many data/nodegroup/rack local maps
+       *  it runs. The hostname of the tasktracker is set to same as datanode3.
+       */
+      mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack3,
+          nodeGroup3, hosts3, jobConf);
+      /* The job is configured with three maps since there are three
+       * (non-splittable) files. On rack2, there are two files and both
+       * have repl of three. The blocks for those files must therefore be
+       * present on all the datanodes (except datanode4), in particular,
+       * the datanode3 on rack2. The third input file is pulled from rack1,
+       * thus the result should be 2 rack-local maps.
+       */
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+          0, 0, 2, jobConf);
+      mr.shutdown();
+
+      /* Run a job with the (only)tasktracker on datanode4.
+       */
+      mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack4,
+          nodeGroup4, hosts4, jobConf);
+
+      /* The job is configured with three maps since there are three
+       * (non-splittable) files. As the way in which repl was setup while
+       * creating the files, we will have all the three files on datanode1 which
+       * is on the same nodegroup with datanode4 where the only tasktracker run.
+       * Thus, the result should be 3 nodegroup-local maps.
+       * The MapReduce cluster have only 1 node which is host4 but no datanode
+       * running on that host. So this is to verify that in compute/data node
+       * separation case, it still can get nodegroup level locality in task
+       * scheduling.
+       */
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+          0, 3, 0, jobConf);
+      mr.shutdown();
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
+                              int numMaps, String jobName) throws IOException {
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outputPath);
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    return JobClient.runJob(jobConf);
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNonLocalJobJarSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNonLocalJobJarSubmission.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNonLocalJobJarSubmission.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestNonLocalJobJarSubmission.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.io.InputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Iterator;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.Scanner;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+
+public class TestNonLocalJobJarSubmission extends ClusterMapReduceTestCase {
+  
+  public void testNonLocalJobJarSubmission() throws Exception {
+    FileSystem hdfs = getFileSystem();
+    
+    OutputStream os = hdfs.create(new Path(getInputDir(), "dummy.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("dummy\n");
+    wr.close();
+    
+    String jobJarPath = createAndUploadJobJar(hdfs);
+    
+    JobConf conf = createJobConf();
+    conf.setJobName("mr");
+    conf.setJobPriority(JobPriority.HIGH);
+    
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(NullWritable.class);
+
+    conf.setMapperClass(ClasspathMapper.class);
+    conf.setReducerClass(ClasspathReducer.class);
+
+    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    
+    conf.setJar(jobJarPath);
+    
+    String jobID = JobClient.runJob(conf).getID().toString();
+    
+    verifyClasspath(hdfs, jobID);
+  }
+  
+  private String createAndUploadJobJar(FileSystem hdfs) throws Exception {
+    File tmp =
+        new File(System.getProperty("test.build.data", "/tmp"), getClass()
+            .getSimpleName());
+    if (!tmp.exists()) {
+      tmp.mkdirs();
+    }
+    File jobJarFile = new File(tmp, "jobjar-on-local.jar");
+    JarOutputStream jstream =
+        new JarOutputStream(new FileOutputStream(jobJarFile));
+    ZipEntry ze = new ZipEntry("lib/lib1.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    ze = new ZipEntry("lib/lib2.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    jstream.finish();
+    jstream.close();
+    
+    Path jobJarPath = new Path(getTestRootDir().makeQualified(hdfs), "jobjar-on-hdfs.jar");
+    hdfs.moveFromLocalFile(new Path(jobJarFile.toURI().toString()), jobJarPath);
+    if (jobJarFile.exists()) {  // just to make sure
+      jobJarFile.delete();
+    }
+    return jobJarPath.toUri().toString();
+  }
+
+  private void verifyClasspath(FileSystem hdfs, String jobID) throws Exception {
+    boolean containsLib1Jar = false;
+    String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
+    boolean containsLib2Jar = false;
+    String lib2JarStr = "jobcache/" + jobID + "/jars/lib/lib2.jar";
+    
+    FileStatus[] fstats = hdfs.listStatus(getOutputDir());
+    for (FileStatus fstat : fstats) {
+      Path p = fstat.getPath();
+      if (hdfs.isFile(p) && p.getName().startsWith("part-")) {
+        InputStream is = hdfs.open(p);
+        Scanner sc = new Scanner(is);
+        while (sc.hasNextLine()) {
+          String line = sc.nextLine();
+          containsLib1Jar = (containsLib1Jar || line.endsWith(lib1JarStr));
+          containsLib2Jar = (containsLib2Jar || line.endsWith(lib2JarStr));
+        }
+        sc.close();
+        is.close();
+      }
+    }
+    
+    assertTrue("lib/lib1.jar should have been unzipped from the job jar and "
+            + "added to the classpath but was not", containsLib1Jar);
+    assertTrue("lib/lib2.jar should have been unzipped from the job jar and "
+            + "added to the classpath but was not", containsLib2Jar);
+  }
+  
+  static class ClasspathMapper 
+      implements Mapper<LongWritable, Text, IntWritable, Text> {
+    
+    private static final IntWritable zero = new IntWritable(0);
+      
+    public void configure(JobConf job) {
+    }
+
+    public void map(LongWritable key, Text val,
+                    OutputCollector<IntWritable, Text> out,
+                    Reporter reporter) throws IOException {
+        
+      ClassLoader applicationClassLoader = this.getClass().getClassLoader();
+      if (applicationClassLoader == null) {
+          applicationClassLoader = ClassLoader.getSystemClassLoader();
+      }
+      URL[] urls = ((URLClassLoader)applicationClassLoader).getURLs();
+      for(URL url : urls) {
+          out.collect(zero, new Text(url.toString()));
+      }
+    }
+    
+    public void close() {
+    }
+  }
+  
+  static class ClasspathReducer 
+      implements Reducer<IntWritable, Text, Text, NullWritable> {
+    
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(IntWritable key, Iterator<Text> it,
+                       OutputCollector<Text, NullWritable> out,
+                       Reporter reporter) throws IOException {
+      while (it.hasNext()) {
+        out.collect(it.next(), NullWritable.get());
+      }
+    }
+    
+    public void close() {
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Jun 21 06:37:27 2013
@@ -183,6 +183,12 @@ public class TestParallelInitialization 
         listener.jobAdded(job);
       }
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   protected JobConf jobConf;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRawHistoryFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRawHistoryFile.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRawHistoryFile.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRawHistoryFile.java Fri Jun 21 06:37:27 2013
@@ -35,14 +35,15 @@ import org.junit.Assert;
 import java.io.IOException;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.net.HttpURLConnection;
 
 import junit.framework.TestCase;
 
 public class TestRawHistoryFile extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestRawHistoryFile.class);
-
+  private String inputPath = System.getProperty("test.build.data",
+      "build/test/data") + "/TestRawHistoryFile";
+  
   public void testRetrieveHistoryFile() {
 
     MiniMRCluster mrCluster = null;
@@ -58,7 +59,7 @@ public class TestRawHistoryFile extends 
           null, null, conf);
 
       conf = mrCluster.createJobConf();
-      createInputFile(conf, "/tmp/input");
+      createInputFile(conf, inputPath);
 
       RunningJob job = runJob(conf);
       LOG.info("Job details: " + job);
@@ -98,7 +99,7 @@ public class TestRawHistoryFile extends 
           null, null, conf);
 
       conf = mrCluster.createJobConf();
-      createInputFile(conf, "/tmp/input");
+      createInputFile(conf, inputPath);
 
       RunningJob job = submitJob(conf);
       LOG.info("Job details: " + job);
@@ -190,7 +191,7 @@ public class TestRawHistoryFile extends 
     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
 
-    FileInputFormat.setInputPaths(conf, "/tmp/input");
+    FileInputFormat.setInputPaths(conf, inputPath);
   }
 
   private String saveHistoryFile(String url) throws IOException {

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Jun 21 06:37:27 2013
@@ -20,37 +20,81 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-
-import junit.framework.TestCase;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
  * failures and the jobtracker is able to tolerate {@link RecoveryManager}
  * failure.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
-public class TestRecoveryManager extends TestCase {
+
+public class TestRecoveryManager {
   private static final Log LOG = 
     LogFactory.getLog(TestRecoveryManager.class);
   private static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
   
+  private FileSystem fs;
+  private MiniDFSCluster dfs;
+  private MiniMRCluster mr;
+
+  @Before
+  public void setUp() throws IOException {
+    fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true);
+  }
+
+  private void startCluster() throws IOException {
+    startCluster(new JobConf());
+  }
+
+  private void startCluster(JobConf conf) throws IOException {
+    mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      if (mr != null) {
+        ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
+            .getClusterStatus(false);
+        if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
+          mr.shutdown();
+        }
+      }
+    } finally {
+      mr = null;
+      
+      try {
+        if (dfs != null) {
+          dfs.shutdown();
+          dfs = null;
+        }
+      } finally {
+        dfs = null;
+      }
+    }
+  }
+  
   /**
    * Tests the {@link JobTracker} against the exceptions thrown in 
    * {@link JobTracker.RecoveryManager}. It does the following :
@@ -60,21 +104,14 @@ public class TestRecoveryManager extends
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
-  public void testJobTracker() throws Exception {
+  @Test(timeout=120000)
+  public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
     LOG.info("Testing jobtracker restart with faulty job");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
-    JobConf conf = new JobConf();
-    
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true); // cleanup
-    
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    
+
     JobConf job1 = mr.createJobConf();
-    
+
     UtilsForTests.configureWaitingJobConf(job1, 
         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, 
         "test-recovery-manager", signalFile, signalFile);
@@ -94,7 +131,7 @@ public class TestRecoveryManager extends
         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, 
         "test-recovery-manager", signalFile, signalFile);
     
-    // submit the faulty job
+    // submit another job
     RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
     LOG.info("Submitted job " + rJob2.getID());
     
@@ -112,7 +149,7 @@ public class TestRecoveryManager extends
     Path jobFile = 
       new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
     LOG.info("Deleting job token file : " + jobFile.toString());
-    fs.delete(jobFile, false); // delete the job.xml file
+    Assert.assertTrue(fs.delete(jobFile, false)); // delete the job.xml file
     
     // create the job.xml file with 1 bytes
     FSDataOutputStream out = fs.create(jobFile);
@@ -125,17 +162,173 @@ public class TestRecoveryManager extends
     // start the jobtracker
     LOG.info("Starting jobtracker");
     mr.startJobTracker();
-    ClusterStatus status = 
-      mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    ClusterStatus status = jobtracker.getClusterStatus(false);
     
     // check if the jobtracker came up or not
-    assertEquals("JobTracker crashed!", 
+    Assert.assertEquals("JobTracker crashed!", 
                  JobTracker.State.RUNNING, status.getJobTrackerState());
-    
-    mr.shutdown();
+
+    // wait for job 2 to complete
+    JobInProgress jip = jobtracker.getJob(rJob2.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal"));
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
   }
   
   /**
+   * Tests the re-submission of the job in case of jobtracker died/restart  
+   *  - submits a job and let it be inited.
+   *  - kills the jobtracker
+   *  - checks if the jobtraker starts normally and job is recovered while 
+   */
+  @Test(timeout=120000)
+  public void testJobResubmission() throws Exception {
+    LOG.info("Testing Job Resubmission");
+    startCluster();
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        signalFile);
+
+    JobClient jc1 = new JobClient(job1);
+    RunningJob rJob1 = jc1.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // now submit job2
+    JobConf job2 = mr.createJobConf();
+
+    String signalFile1 = new Path(TEST_DIR, "signal1").toString();
+    UtilsForTests.configureWaitingJobConf(job2, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
+        "test-recovery-manager", signalFile1, signalFile1);
+    job2.setBoolean(JobConf.MAPREDUCE_RECOVER_JOB, false); // don't recover
+    
+    // submit the job
+    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+    LOG.info("Submitted job " + rJob2.getID());
+    
+    // wait for it to init
+    JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
+    
+    while (!jip2.inited()) {
+      LOG.info("Waiting for job " + jip2.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc1);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // assert that only job1 is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1, 
+        jobtracker.getAllJobs().length);
+
+    // wait for job 1 to complete
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal"));
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job1.get("mapreduce.job.dir"))));
+    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job2.get("mapreduce.job.dir"))));
+  }
+
+  public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
+    static CountDownLatch finalizeCall = new CountDownLatch(1);
+
+    public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public void finalizeJob(JobConf conf, JobID id) {
+      if (finalizeCall.getCount() == 0) {
+        return;
+      }
+      finalizeCall.countDown();
+      throw new IllegalStateException("Controlled error finalizing job");
+    }
+  }
+
+  @Test
+  public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
+    LOG.info("Testing Job Resubmission");
+
+    JobConf conf = new JobConf();
+    // make sure that the jobtracker is in recovery mode
+    conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+    // use a test JobTrackerInstrumentation implementation to shut down
+    // the jobtracker after the tasks have all completed, but
+    // before the job is finalized and check that it can be recovered correctly
+    conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class,
+            JobTrackerInstrumentation.class);
+
+    startCluster(conf);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    SleepJob job = new SleepJob();
+    job.setConf(mr.createJobConf());
+    JobConf job1 = job.setupJobConf(1, 0, 1, 1, 1, 1);
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    TestJobTrackerInstrumentation.finalizeCall.await();
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    UtilsForTests.waitForJobTracker(jc);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1,
+        jobtracker.getAllJobs().length);
+
+    // wait for job 1 to complete
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
+
+  /**
    * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
    * during recovery. It does the following :
    *  - submits a job with HIGH priority and x tasks
@@ -147,19 +340,15 @@ public class TestRecoveryManager extends
    *  - checks if the jobtraker starts normally and job#2 is recovered while 
    *    job#1 is failed.
    */
-  public void testRecoveryManager() throws Exception {
+  @Test(timeout=120000)
+  public void testJobTrackerRestartWithBadJobs() throws Exception {
     LOG.info("Testing recovery-manager");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
     
-    // clean up
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true);
-    
-    JobConf conf = new JobConf();
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
     
     JobConf job1 = mr.createJobConf();
@@ -167,7 +356,7 @@ public class TestRecoveryManager extends
     job1.setJobPriority(JobPriority.HIGH);
     
     UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the faulty job
@@ -179,13 +368,13 @@ public class TestRecoveryManager extends
       LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
       UtilsForTests.waitFor(100);
     }
-    
+
     // now submit job2
     JobConf job2 = mr.createJobConf();
 
     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
     UtilsForTests.configureWaitingJobConf(job2, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
         "test-recovery-manager", signalFile1, signalFile1);
     
     // submit the job
@@ -206,7 +395,7 @@ public class TestRecoveryManager extends
       UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
     
     UtilsForTests.configureWaitingJobConf(job3, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the job
@@ -224,7 +413,7 @@ public class TestRecoveryManager extends
       LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
       UtilsForTests.waitFor(100);
     }
-
+    
     // kill the jobtracker
     LOG.info("Stopping jobtracker");
     mr.stopJobTracker();
@@ -247,21 +436,27 @@ public class TestRecoveryManager extends
     jobtracker = mr.getJobTrackerRunner().getJobTracker();
     
     // assert that job2 is recovered by the jobtracker as job1 would fail
-    assertEquals("Recovery manager failed to tolerate job failures",
-                 2, jobtracker.getAllJobs().length);
+    Assert.assertEquals("Recovery manager failed to tolerate job failures", 1,
+        jobtracker.getAllJobs().length);
     
     // check if the job#1 has failed
     JobStatus status = jobtracker.getJobStatus(rJob1.getID());
-    assertEquals("Faulty job not failed", 
-                 JobStatus.FAILED, status.getRunState());
+    Assert.assertNull("Faulty job should not be resubmitted", status);
     
     jip = jobtracker.getJob(rJob2.getID());
-    assertFalse("Job should be running", jip.isComplete());
+    Assert.assertFalse("Job should be running", jip.isComplete());
     
     status = jobtracker.getJobStatus(rJob3.getID());
-    assertNull("Job should be missing", status);
-    
-    mr.shutdown();
+    Assert.assertNull("Job should be missing because of ACL changed", status);
+
+    // wait for job 2 to complete
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal1"));
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
   }
   
   /**
@@ -277,114 +472,79 @@ public class TestRecoveryManager extends
    *   - garble the jobtracker.info file and restart he jobtracker, the 
    *     jobtracker should crash.
    */
+  @Test(timeout=120000)
   public void testRestartCount() throws Exception {
-    LOG.info("Testing restart-count");
+    LOG.info("Testing Job Restart Count");
+    startCluster();
     String signalFile = new Path(TEST_DIR, "signal").toString();
-    
-    // clean up
-    FileSystem fs = FileSystem.get(new Configuration());
-    fs.delete(TEST_DIR, true);
-    
-    JobConf conf = new JobConf();
-    conf.set("mapred.jobtracker.job.history.block.size", "1024");
-    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-    conf.setBoolean("mapred.jobtracker.restart.recover", true);
-    // since there is no need for initing
-    conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
-                  TaskScheduler.class);
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-    JobClient jc = new JobClient(mr.createJobConf());
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
 
-    // check if the jobtracker info file exists
-    Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
-    assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
 
-    // check if garbling the system files disables the recovery process
-    LOG.info("Stopping jobtracker for testing with system files deleted");
-    mr.stopJobTracker();
-    
-    // delete the info file
-    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
-    fs.delete(rFile,false);
-    
-    // start the jobtracker
-    LOG.info("Starting jobtracker with system files deleted");
-    mr.startJobTracker();
-    
-    UtilsForTests.waitForJobTracker(jc);
-    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    JobConf job1 = mr.createJobConf();
+    // set the high priority
+    job1.setJobPriority(JobPriority.HIGH);
 
-    // check if the recovey is disabled
-    assertFalse("Recovery is not disabled upon missing system files", 
-                jobtracker.recoveryManager.shouldRecover());
-
-    // check if the system dir is sane
-    assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-    Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
-    assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+    UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
+        signalFile);
 
-    // submit a job
-    JobConf job = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
-        "test-recovery-manager", signalFile, signalFile);
-    
     // submit the faulty job
-    RunningJob rJob = jc.submitJob(job);
-    LOG.info("Submitted first job " + rJob.getID());
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
 
-    // wait for 1 min
-    UtilsForTests.waitFor(60000);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
-    // kill the jobtracker multiple times and check if the count is correct
-    for (int i = 1; i <= 5; ++i) {
+    while (!jip.inited()) {
+      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
+
+    for (int i = 1; i <= 2; ++i) {
       LOG.info("Stopping jobtracker for " + i + " time");
       mr.stopJobTracker();
-      
+
       // start the jobtracker
       LOG.info("Starting jobtracker for " + i + " time");
       mr.startJobTracker();
-      
+
       UtilsForTests.waitForJobTracker(jc);
-      
-      // check if the system dir is sane
-      assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
-      assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-      
+
       jobtracker = mr.getJobTrackerRunner().getJobTracker();
-      JobInProgress jip = jobtracker.getJob(rJob.getID());
-      
+
       // assert if restart count is correct
-      assertEquals("Recovery manager failed to recover restart count",
-                   i, jip.getNumRestarts());
+      // It should always be 0 now as its resubmit everytime then restart.
+      Assert.assertEquals("Recovery manager failed to recover restart count", 
+          0, jip.getNumRestarts());
     }
-    
+
     // kill the old job
-    rJob.killJob();
+    rJob1.killJob();
 
     // II. Submit a new job and check if the restart count is 0
-    JobConf job1 = mr.createJobConf();
-    
-    UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
-        "test-recovery-manager", signalFile, signalFile);
+    JobConf job2 = mr.createJobConf();
+
+    UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
+        new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
+        signalFile, signalFile);
 
     // submit a new job
-    rJob = jc.submitJob(job1);
-    LOG.info("Submitted first job after restart" + rJob.getID());
+    RunningJob rJob2 = jc.submitJob(job2);
+    LOG.info("Submitted first job after restart" + rJob2.getID());
 
     // assert if restart count is correct
-    JobInProgress jip = jobtracker.getJob(rJob.getID());
-    assertEquals("Restart count for new job is incorrect",
-                 0, jip.getNumRestarts());
+    jip = jobtracker.getJob(rJob2.getID());
+    Assert.assertEquals("Restart count for new job is incorrect", 0, jip
+        .getNumRestarts());
 
     LOG.info("Stopping jobtracker for testing the fs errors");
     mr.stopJobTracker();
 
     // check if system.dir problems in recovery kills the jobtracker
+    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
     fs.delete(rFile, false);
     FSDataOutputStream out = fs.create(rFile);
     out.writeBoolean(true);
@@ -394,18 +554,18 @@ public class TestRecoveryManager extends
     LOG.info("Starting jobtracker with fs errors");
     mr.startJobTracker();
     JobTrackerRunner runner = mr.getJobTrackerRunner();
-    assertFalse("JobTracker is still alive", runner.isActive());
+    Assert.assertFalse("JobTracker is still alive", runner.isActive());
 
-    mr.shutdown();
-  } 
+  }
 
   /**
    * Test if the jobtracker waits for the info file to be created before 
    * starting.
    */
+  @Test(timeout=120000)
   public void testJobTrackerInfoCreation() throws Exception {
     LOG.info("Testing jobtracker.info file");
-    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+    dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
     String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
                       + (dfs.getFileSystem()).getUri().getPort();
     // shut down the data nodes
@@ -418,6 +578,10 @@ public class TestRecoveryManager extends
     conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
 
     JobTracker jobtracker = new JobTracker(conf);
+    jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
+    jobtracker.initializeFilesystem();
+    jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
+    jobtracker.initialize();
 
     // now check if the update restart count works fine or not
     boolean failed = false;
@@ -426,14 +590,15 @@ public class TestRecoveryManager extends
     } catch (IOException ioe) {
       failed = true;
     }
-    assertTrue("JobTracker created info files without datanodes!!!", failed);
+    Assert.assertTrue("JobTracker created info files without datanodes!!!", 
+        failed);
 
     Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
     Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
     FileSystem fs = dfs.getFileSystem();
-    assertFalse("Info file exists after update failure", 
+    Assert.assertFalse("Info file exists after update failure", 
                 fs.exists(restartFile));
-    assertFalse("Temporary restart-file exists after update failure", 
+    Assert.assertFalse("Temporary restart-file exists after update failure", 
                 fs.exists(restartFile));
 
     // start 1 data node
@@ -446,6 +611,206 @@ public class TestRecoveryManager extends
     } catch (IOException ioe) {
       failed = true;
     }
-    assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
+    Assert.assertFalse("JobTracker failed to create info files with datanodes!", 
+        failed);
+  }
+  
+  static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws IOException {
+    Path p = new Path(dir);
+    fs.mkdirs(p);
+    fs.setPermission(p, new FsPermission(mode));
+  }
+  
+  @Test(timeout=120000)
+  public void testJobResubmissionAsDifferentUser() throws Exception {
+    LOG.info("Testing Job Resubmission as a different user to the jobtracker");
+    
+    final Path HDFS_TEST_DIR = new Path("/tmp");
+    
+    JobConf conf = new JobConf();
+
+    dfs = new MiniDFSCluster(conf, 1, true, null);
+    fs = dfs.getFileSystem();
+
+    conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
+    conf.set("mapred.system.dir", "/mapred");
+    
+    String mapredSysDir =  conf.get("mapred.system.dir");
+    mkdirWithPerms(fs, mapredSysDir, (short)0700);
+    fs.setOwner(new Path(mapredSysDir),
+        UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
+
+    mkdirWithPerms(fs, "/user", (short)0777);
+    mkdirWithPerms(fs, "/mapred", (short)0777);
+    mkdirWithPerms(fs, "/tmp", (short)0777);
+
+    mr = 
+        new MiniMRCluster(
+            1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
+
+    String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+    .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    final JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
+        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        signalFile);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
+    job1.setUser(ugi.getUserName());
+
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
+      public RunningJob run() throws IOException {
+        JobClient jc = new JobClient(job1);
+        return jc.submitJob(job1);
+      }
+    });
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // Blocking JT INIT on restart
+    mr.getJobTrackerConf().setBoolean(
+        JobTracker.JT_INIT_CONFIG_KEY_FOR_TESTS, false);
+    
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker(false);
+
+    while (!mr.getJobTrackerRunner().isUp()) {
+      Thread.sleep(100);
+    }
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    Assert.assertNotNull(jobtracker);
+    
+    // now check for job status ... 
+    // should throw JobTrackerNotYetInitializedException
+    boolean gotJTNYIException = false;
+    try {
+      jobtracker.getJobStatus(rJob1.getID());
+    } catch (JobTrackerNotYetInitializedException jtnyie) {
+      LOG.info("Caught JobTrackerNotYetInitializedException", jtnyie);
+      gotJTNYIException = true;
+    }
+    Assert.assertTrue(gotJTNYIException);
+    
+    jobtracker.setInitDone(true);
+
+    UtilsForTests.waitForJobTracker(jc);
+
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+
+    // Signaling Map task to complete
+    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    rJob1 = jc.getJob(rJob1.getID());
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+  }
+
+  @Test(timeout=120000)
+  public void testJobInitError() throws Exception {
+    LOG.info("Testing error during Job submission");
+    
+    final Path HDFS_TEST_DIR = new Path("/tmp");
+    
+    JobConf conf = new JobConf();
+    
+    dfs = new MiniDFSCluster(conf, 1, true, null);
+    fs = dfs.getFileSystem();
+
+    conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
+    conf.set("mapred.system.dir", "/mapred");
+    
+    String mapredSysDir =  conf.get("mapred.system.dir");
+    mkdirWithPerms(fs, mapredSysDir, (short)0700);
+    fs.setOwner(new Path(mapredSysDir),
+        UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
+
+    mkdirWithPerms(fs, "/user", (short)0777);
+    mkdirWithPerms(fs, "/mapred", (short)0777);
+    mkdirWithPerms(fs, "/tmp", (short)0777);
+
+    mr = 
+        new MiniMRCluster(
+            1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
+
+    String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
+
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf()
+    .setBoolean("mapred.jobtracker.restart.recover", true);
+
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    final JobConf job1 = mr.createJobConf();
+    UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
+        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        signalFile);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
+    job1.setUser(ugi.getUserName());
+
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
+      public RunningJob run() throws IOException {
+        // Job 1 init should fail
+        job1.setBoolean(JobInProgress.JOB_INIT_EXCEPTION, true);
+        JobClient jc = new JobClient(job1);
+        return jc.submitJob(job1);
+      }
+    });
+    LOG.info("Submitted first job " + rJob1.getID());
+
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+
+    // start the jobtracker, after turning off job-init exception
+    LOG.info("Starting jobtracker");
+    mr.getJobTrackerConf().setBoolean(
+        JobInProgress.JT_JOB_INIT_EXCEPTION_OVERRIDE, true);
+    mr.startJobTracker(false);
+
+    while (!mr.getJobTrackerRunner().isUp()) {
+      Thread.sleep(100);
+    }
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    Assert.assertNotNull(jobtracker);
+    
+    UtilsForTests.waitForJobTracker(jc);
+
+    // assert that job is recovered by the jobtracker
+    Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
+    JobInProgress jip = jobtracker.getJob(rJob1.getID());
+
+    // Signaling Map task to complete
+    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+      UtilsForTests.waitFor(100);
+    }
+    rJob1 = jc.getJob(rJob1.getID());
+    Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java Fri Jun 21 06:37:27 2013
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestShuffleExceptionCount {
@@ -31,8 +31,8 @@ public class TestShuffleExceptionCount {
   static boolean abortCalled = false;
   private final float epsilon = 1e-5f;
 
-  @BeforeClass
-  public static void initialize() throws Exception {
+  @Before
+  public void initialize() throws Exception {
     abortCalled = false;
   }
     
@@ -52,7 +52,6 @@ public class TestShuffleExceptionCount {
 
   @Test
   public void testCheckException() throws IOException, InterruptedException {
-
     // first test with only MsgRegex set but doesn't match
     String exceptionMsgRegex = "Broken pipe";
     String exceptionStackRegex = null;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestStatisticsCollector.java Fri Jun 21 06:37:27 2013
@@ -21,6 +21,7 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
 import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+import org.apache.hadoop.mapred.StatisticsCollector.Stat.TimeStat;
 
 public class TestStatisticsCollector extends TestCase{
 
@@ -80,4 +81,60 @@ public class TestStatisticsCollector ext
     assertEquals(95, stat.getValues().get(sincStart).getValue());
   }
 
+  public void testBucketing() throws Exception {
+    StatisticsCollector collector = new StatisticsCollector();
+    TimeWindow window = new TimeWindow("test", 33, 11);
+    // We'll collect 3 buckets before we start removing: 33 / 11 = 3
+    // We'll do 2 updates per bucket (5 is default period): 11 / 5 = 2
+    TimeWindow[] windows = {window};
+    Stat stat1 = collector.createStat("TaskTracker A", windows);
+
+    // TT A starts out with 0 buckets
+    assertEquals(0, stat1.getValues().get(window).getValue());
+    stat1.inc(1);
+    collector.update();
+    assertEquals(0, stat1.getValues().get(window).getValue());
+    stat1.inc(1);
+    collector.update();
+    assertEquals(2, stat1.getValues().get(window).getValue());
+    stat1.inc(1);
+    collector.update();
+    assertEquals(2, stat1.getValues().get(window).getValue());
+    stat1.inc(2);
+    collector.update();
+    assertEquals(2+3, stat1.getValues().get(window).getValue());
+    stat1.inc(0);
+    collector.update();
+    assertEquals(2+3, stat1.getValues().get(window).getValue());
+    stat1.inc(1);
+    // At the next update, we now have 3 buckets for TT 1
+    collector.update();
+    assertEquals(2+3+1, stat1.getValues().get(window).getValue());
+    stat1.inc(4);
+    collector.update();
+    assertEquals(2+3+1, stat1.getValues().get(window).getValue());
+    // At the next update, we're going to drop the earliest bucket for TT A and
+    // keep a max of 3 buckets forever
+    collector.update();
+    assertEquals(3+1+4, stat1.getValues().get(window).getValue());
+
+    // A new TaskTracker connects and gets a Stat allocated for it
+    Stat stat2 = collector.createStat("TaskTracker B", windows);
+
+    // TT B starts out with 0 buckets even though TT A already has 3
+    assertEquals(0, stat2.getValues().get(window).getValue());
+    stat2.inc(10);
+    collector.update();
+    assertEquals(3+1+4, stat1.getValues().get(window).getValue());
+    assertEquals(0, stat2.getValues().get(window).getValue());
+    stat1.inc(3);
+    stat2.inc(2);
+    // At the next update, we're going to drop the earliest bucket for TT A
+    // but we shouldn't drop the earliest bucket for TT B because it only
+    // has one bucket so far (which would result in a value of 0 instead of 12)
+    collector.update();
+    assertEquals(1+4+3, stat1.getValues().get(window).getValue());
+    assertEquals(12, stat2.getValues().get(window).getValue());
+  }
+
 }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestSubmitJob.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestSubmitJob.java Fri Jun 21 06:37:27 2013
@@ -287,10 +287,18 @@ public class TestSubmitJob extends TestC
           reduceSignalFile.toString());
       // wait for job to be done
       UtilsForTests.waitTillDone(jClient);
-
-      // check if the staging area is cleaned up
-      LOG.info("Check if job submit dir is cleanup or not");
-      assertFalse(fs.exists(jobSubmitDirpath));
+      
+      // Check that the job submit directory is cleaned up
+      int maxChecks = 20;
+      int sleepMs = 100;
+      for (int i = 0; fs.exists(jobSubmitDirpath) && i < maxChecks; i++) {
+        try {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException ex) {}
+      }
+      
+      assertFalse("Job submit dir was not cleaned up after " +
+          maxChecks * sleepMs + " ms", fs.exists(jobSubmitDirpath));
     } finally {
       if (mr != null) {
         mr.shutdown();

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskGraphServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskGraphServlet.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskGraphServlet.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskGraphServlet.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestTaskGraphServlet {
+  @Test
+  public void testTaskGraphServletShouldNotReturnEmptyContForNotasks()
+      throws Exception {
+    String jobId = "job_201108291216_0002";
+    HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+    Mockito.doReturn(jobId).when(request).getParameter("jobid");
+    Mockito.doReturn("map").when(request).getParameter("type");
+
+    JobTracker jobTracker = Mockito.mock(JobTracker.class);
+    Mockito.doReturn(new TaskReport[0]).when(jobTracker).getMapTaskReports(
+        JobID.forName(jobId));
+
+    ServletContext servletContext = Mockito.mock(ServletContext.class);
+    Mockito.doReturn(jobTracker).when(servletContext).getAttribute(
+        "job.tracker");
+
+    HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+    PrintWriter printWriter = Mockito.mock(PrintWriter.class);
+    Mockito.doReturn(printWriter).when(response).getWriter();
+
+    TaskGraphServlet taskGraphServlet = getTaskGraphServlet(servletContext);
+
+    taskGraphServlet.doGet(request, response);
+    Mockito.verify(printWriter, Mockito.atLeastOnce()).print("</svg>");
+  }
+
+  private TaskGraphServlet getTaskGraphServlet(
+      final ServletContext servletContext) {
+    TaskGraphServlet taskGraphServlet = new TaskGraphServlet() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public ServletContext getServletContext() {
+        return servletContext;
+      }
+    };
+    return taskGraphServlet;
+  }
+}