You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [39/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerActionCleanup extends TestCase {
+  String jtIdentifier = "201210122331";
+  TestTaskTracker tt = null;
+
+  @Before
+  public void setUp() {
+    tt = new TestTaskTracker();
+  }
+
+  @Test
+  // check duplicate entries do not make it to the queue
+  public void testDuplicateEntries() throws InterruptedException {
+
+    KillJobAction action = new KillJobAction();
+    // add the action twice
+    tt.addActionToCleanup(action);
+    tt.addActionToCleanup(action);
+
+    checkItemCountInQueue(tt, 1, 1, 0);
+  }
+
+  @Test
+  // test to make sure all tasks with localizing jobs are added to another queue
+  public void testLocalizingJobActions() throws InterruptedException,
+      IOException {
+
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the one round the first task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 3, 1);
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 2nd task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 2, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 3rd task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 1, 3);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 0, 4);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // this is a no-op there is a sleep here but cant really test it
+    checkItemCountInQueue(tt, 4, 0, 4);
+  }
+
+  @Test
+  // test when all active items in the queue
+  public void testAllActiveJobActions() throws InterruptedException,
+      IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = false;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the one round the first task gets killed
+    checkItemCountInQueue(tt, 3, 3, 0);
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 2nd task gets killed
+    checkItemCountInQueue(tt, 2, 2, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 3rd task gets killed
+    checkItemCountInQueue(tt, 1, 1, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    checkItemCountInQueue(tt, 0, 0, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // this is a no-op there is a sleep here but cant really test it
+    checkItemCountInQueue(tt, 0, 0, 0);
+  }
+
+  @Test
+  // test when some items are localizing and some are not in the queue
+  public void testMixedJobActions() throws InterruptedException, IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // since the first attempt is localizing it will move to the
+    // localizing queue
+    checkItemCountInQueue(tt, 4, 3, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 2nd round the tip is a job that can be cleaned up
+    checkItemCountInQueue(tt, 3, 2, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 3rd round the 3rd task is getting localized
+    checkItemCountInQueue(tt, 3, 1, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    checkItemCountInQueue(tt, 2, 0, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // nothing should change as the tasks are being localized
+    checkItemCountInQueue(tt, 2, 0, 2);
+
+  }
+
+  // test when some items are localizing and some are not in the queue
+  // and we update the status of the actions between clean up jobs
+  @Test
+  public void testMixedJobActionsAndUpdateActions()
+      throws InterruptedException, IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // since the first attempt is localizing it will move to the
+    // localizing queue
+    checkItemCountInQueue(tt, 4, 3, 1);
+
+    // before running clean up again change the status of the first attempt to
+    // not be localizing.
+    rjob1.localizing = false;
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 2nd round the tip is a job that can be cleaned up so it is
+    // cleaned and the localizing job is no longer being localized so it moves
+    // to the active queue
+    checkItemCountInQueue(tt, 3, 3, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 3rd round the 3rd task is getting localized
+    // so gets moved to the localized queue.
+    checkItemCountInQueue(tt, 3, 2, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    // and leaves 2 tasks one which switched from localizing to clean
+    // and one that is being localized.
+    checkItemCountInQueue(tt, 2, 1, 1);
+  }
+
+  /*
+   * Method to test the number of items in the various datastructures holding
+   * clean up actions
+   */
+  private void checkItemCountInQueue(TaskTracker tt, int allSize,
+      int activeSize, int inactiveSize) {
+    // check the size and the content of the hashset and queue
+    assertEquals("Size of allCleanUpActions is not " + allSize, allSize,
+        tt.allCleanupActions.size());
+    assertEquals("Size of activeCleanUpActions is not " + activeSize,
+        activeSize, tt.activeCleanupActions.size());
+    assertEquals("Size of inactiveCleanUpActions is not " + inactiveSize,
+        inactiveSize, tt.inactiveCleanupActions.size());
+  }
+
+  class TestTaskTracker extends TaskTracker {
+    // override the method so its a no-op
+    synchronized void purgeJob(KillJobAction action) throws IOException {
+      LOG.info("Received 'KillJobAction' for job: " + action.getJobID());
+    }
+
+    // override the method so its a no-op
+    void processKillTaskAction(KillTaskAction killAction) throws IOException {
+      LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.VersionInfo;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test the version check the TT performs when connecting to the JT
+ */
+public class TestTaskTrackerVersionCheck {
+
+  /**
+   * Test the default TT version checking
+   */
+  @Test
+  public void testDefaultVersionCheck() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf jtConf = new JobConf();
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+      TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+      String currBuildVersion = VersionInfo.getBuildVersion();
+      String currVersion = VersionInfo.getVersion();
+
+      assertTrue(tt.isPermittedVersion(currBuildVersion, currVersion));
+      assertFalse("We disallow different versions",
+          tt.isPermittedVersion(currBuildVersion+"x", currVersion+"x"));
+      assertFalse("We disallow different full versions with same version",
+          tt.isPermittedVersion(currBuildVersion+"x", currVersion));      
+      try {
+        tt.isPermittedVersion(currBuildVersion, currVersion+"x");
+        fail("Matched full version with mismatched version");
+      } catch (AssertionError ae) {
+        // Expected. The versions should always match if the full
+        // versions match as the full version contains the version.
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test the "relaxed" TT version checking
+   */
+  @Test
+  public void testRelaxedVersionCheck() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf jtConf = new JobConf();
+      jtConf.setBoolean(
+          CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY, true);
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+      TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+      String currFullVersion = VersionInfo.getBuildVersion();
+      String currVersion = VersionInfo.getVersion();
+
+      assertTrue(tt.isPermittedVersion(currFullVersion, currVersion));
+      assertFalse("We dissallow different versions",
+          tt.isPermittedVersion(currFullVersion+"x", currVersion+"x"));
+      assertTrue("We allow different full versions with same version",
+          tt.isPermittedVersion(currFullVersion+"x", currVersion));
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test no TT version checking
+   */
+  @Test
+  public void testNoVersionCheck() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf jtConf = new JobConf();
+      jtConf.setBoolean(
+          CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY, true);
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+      TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+      String currFullVersion = VersionInfo.getBuildVersion();
+      String currVersion = VersionInfo.getVersion();
+
+      assertTrue(tt.isPermittedVersion(currFullVersion, currVersion));
+      assertTrue(tt.isPermittedVersion(currFullVersion+"x", currVersion+"x"));
+      assertTrue(tt.isPermittedVersion(currFullVersion+"x", currVersion));
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -20,16 +20,20 @@ package org.apache.hadoop.mapred;
 
 import java.io.*;
 import java.util.*;
-import junit.framework.TestCase;
 
-import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
   private static final Log LOG =
     LogFactory.getLog(TestTextInputFormat.class.getName());
 
@@ -39,17 +43,20 @@ public class TestTextInputFormat extends
   private static FileSystem localFs = null; 
   static {
     try {
+      defaultConf.set("fs.default.name", "file:///");
       localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
   }
-  private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
-             "TestTextInputFormat");
-  
+
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestTextInputFormat").makeQualified(localFs);
+
+  @Test
   public void testFormat() throws Exception {
-    JobConf job = new JobConf();
+    JobConf job = new JobConf(defaultConf);
     Path file = new Path(workDir, "test.txt");
 
     // A reporter that does nothing
@@ -127,6 +134,95 @@ public class TestTextInputFormat extends
     }
   }
 
+  @Test
+  public void testSplitableCodecs() throws IOException {
+    JobConf conf = new JobConf(defaultConf);
+    int seed = new Random().nextInt();
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+      ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(conf, workDir);
+
+    final int MAX_LENGTH = 500000;
+
+    // for a variety of lengths
+    for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+        length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+      LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      TextInputFormat format = new TextInputFormat();
+      format.configure(conf);
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+        LOG.info("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(conf, numSplits);
+        LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j]);
+          RecordReader<LongWritable, Text> reader =
+            format.getRecordReader(splits[j], conf, reporter);
+          try {
+            int counter = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v +
+                    " in split " + j +
+                    " at position "+reader.getPos());
+              }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              counter++;
+            }
+            if (counter > 0) {
+              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+            } else {
+              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+    }
+  }
+
   private static LineReader makeStream(String str) throws IOException {
     return new LineReader(new ByteArrayInputStream
                                              (str.getBytes("UTF-8")), 
@@ -138,6 +234,7 @@ public class TestTextInputFormat extends
                                            bufsz);
   }
   
+  @Test
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
@@ -156,6 +253,7 @@ public class TestTextInputFormat extends
    *
    * @throws Exception
    */
+  @Test
   public void testNewLines() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +293,7 @@ public class TestTextInputFormat extends
    *
    * @throws Exception
    */
+  @Test
   public void testMaxLineLength() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
@@ -253,8 +352,9 @@ public class TestTextInputFormat extends
   /**
    * Test using the gzip codec for reading
    */
-  public static void testGzip() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzip() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);
@@ -286,8 +386,9 @@ public class TestTextInputFormat extends
   /**
    * Test using the gzip codec and an empty input file
    */
-  public static void testGzipEmpty() throws IOException {
-    JobConf job = new JobConf();
+  @Test
+  public void testGzipEmpty() throws IOException {
+    JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java Fri Jun 21 06:37:27 2013
@@ -130,7 +130,7 @@ public class TestUserDefinedCounters ext
     assertEquals(4,
         runningJob.getCounters().getGroup("StringCounter")
         .getCounter("MapRecords"));
-    assertTrue(counters.getGroupNames().size() <= 51);
+    assertTrue(counters.getGroupNames().size() <= Counters.MAX_GROUP_LIMIT);
     int i = 0;
     while (counters.size() < Counters.MAX_COUNTER_LIMIT) {
       counters.incrCounter("IncrCounter", "limit " + i, 2);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java Fri Jun 21 06:37:27 2013
@@ -20,13 +20,20 @@ package org.apache.hadoop.mapred;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.URL;
 import java.net.HttpURLConnection;
+import java.net.URL;
 import java.net.URLEncoder;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -35,20 +42,13 @@ import org.apache.hadoop.mapred.JobHisto
 import org.apache.hadoop.mapred.JobHistory.TaskAttempt;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.HadoopPolicyProvider;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.junit.Test;
 
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
 
   private static final Log LOG = LogFactory.getLog(
@@ -87,7 +87,13 @@ public class TestWebUIAuthorization exte
   static int getHttpStatusCode(String urlstring, String userName,
       String method) throws IOException {
     LOG.info("Accessing " + urlstring + " as user " + userName);
-    URL url = new URL(urlstring + "&user.name=" + userName);
+    URL url = null;
+    if (userName == null) {
+      url = new URL(urlstring);
+    } else {
+      url = new URL(urlstring + "&user.name=" + userName);
+    }
+
     HttpURLConnection connection = (HttpURLConnection)url.openConnection();
     connection.setRequestMethod(method);
     if (method.equals("POST")) {
@@ -776,7 +782,13 @@ public class TestWebUIAuthorization exte
     Properties props = new Properties();
     props.setProperty("hadoop.http.filter.initializers",
         DummyFilterInitializer.class.getName());
-    props.setProperty(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, "true");
+    props.setProperty(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 
+        "true");
+    props.setProperty(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+        HadoopPolicyProvider.class.getName());
+    props.setProperty(
+        CommonConfigurationKeys.HADOOP_SECURITY_INSTRUMENTATION_REQUIRES_ADMIN, 
+        "true");
     props.setProperty(JobConf.MR_ADMINS, mrAdminUser + " " + mrAdminGroup);
 
     startCluster(true, props);
@@ -908,9 +920,16 @@ public class TestWebUIAuthorization exte
     String taskGraphServlet = jtURL + "/taskgraph?type=map&jobid="
         + jobid.toString();
     validateViewJob(taskGraphServlet, "GET");
+    assertEquals("Incorrect return code for null user",
+        HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(taskGraphServlet, null, "GET"));
+    
     taskGraphServlet = jtURL + "/taskgraph?type=reduce&jobid="
         + jobid.toString();
     validateViewJob(taskGraphServlet, "GET");
+    assertEquals("Incorrect return code for null user",
+        HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(taskGraphServlet, null, "GET"));
   }
 
   // validate access of jobdetails.jsp

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Jun 21 06:37:27 2013
@@ -843,6 +843,10 @@ public class UtilsForTests {
     JobConf conf = new JobConf();
     conf.set("mapred.job.tracker", "localhost:0");
     conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    return getJobTracker(conf, qm);
+  }
+
+  static JobTracker getJobTracker(JobConf conf, QueueManager qm) {
     JobTracker jt;
     try {
       if (qm == null) {
@@ -850,6 +854,10 @@ public class UtilsForTests {
       } else {
         jt = new JobTracker(conf, qm);
       }
+      jt.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
+      jt.initializeFilesystem();
+      jt.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
+      jt.initialize();
       return jt;
     } catch (Exception e) {
       throw new RuntimeException("Could not start jt", e);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -18,6 +18,9 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
@@ -25,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -66,6 +70,7 @@ public class TestCombineFileInputFormat 
   final Path dir2 = new Path(inDir, "/dir2");
   final Path dir3 = new Path(inDir, "/dir3");
   final Path dir4 = new Path(inDir, "/dir4");
+  final Path dir5 = new Path(inDir, "/dir5");
 
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
@@ -81,6 +86,24 @@ public class TestCombineFileInputFormat 
       return null;
     }
   }
+  
+  /** Dummy class to extend CombineFileInputFormat. It allows 
+   * non-existent files to be passed into the CombineFileInputFormat, allows
+   * for easy testing without having to create real files.
+   */
+  private class DummyInputFormat1 extends DummyInputFormat {
+    @Override
+    protected FileStatus[] listStatus(JobConf job) throws IOException {
+      Path[] files = getInputPaths(job);
+      FileStatus[] results = new FileStatus[files.length];
+      for (int i = 0; i < files.length; i++) {
+        Path p = files[i];
+        FileSystem fs = p.getFileSystem(job);
+        results[i] = fs.getFileStatus(p);
+      }
+      return results;
+    }
+  }
 
   public void testSplitPlacement() throws IOException {
     String namenode = null;
@@ -89,16 +112,16 @@ public class TestCombineFileInputFormat 
     FileSystem fileSys = null;
     String testName = "TestSplitPlacement";
     try {
-      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
-       * 1) file1, just after starting the datanode on r1, with 
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
        *    a repl factor of 1, and,
        * 2) file2, just after starting the datanode on r2, with 
        *    a repl factor of 2, and,
-       * 3) file3 after starting the all three datanodes, with a repl 
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
        *    factor of 3.
-       * At the end, file1 will be present on only datanode1, file2 will be
-       * present on datanode 1 and datanode2 and 
-       * file3 will be present on all datanodes. 
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
        */
       JobConf conf = new JobConf();
       conf.setBoolean("dfs.replication.considerLoad", false);
@@ -114,6 +137,30 @@ public class TestCombineFileInputFormat 
       }
       Path file1 = new Path(dir1 + "/file1");
       writeFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5");
+      writeFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      JobConf job = new JobConf(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      InputSplit[] splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test0): " + splits.length);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.length, 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
       dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
       dfs.waitActive();
 
@@ -122,14 +169,14 @@ public class TestCombineFileInputFormat 
       writeFile(conf, file2, (short)2, 2);
 
       // split it using a CombinedFile input format
-      DummyInputFormat inFormat = new DummyInputFormat();
+      inFormat = new DummyInputFormat();
       inFormat.setInputPaths(conf, dir1 + "," + dir2);
       inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
+      splits = inFormat.getSplits(conf, 1);
       System.out.println("Made splits(Test1): " + splits.length);
 
       // make sure that each split has different locations
-      CombineFileSplit fileSplit = null;
+      fileSplit = null;
       for (int i = 0; i < splits.length; ++i) {
         fileSplit = (CombineFileSplit) splits[i];
         System.out.println("File split(Test1): " + fileSplit);
@@ -439,7 +486,7 @@ public class TestCombineFileInputFormat 
       }
     }
   }
-
+  
   static void writeFile(Configuration conf, Path name,
       short replication, int numBlocks) throws IOException {
     FileSystem fileSys = FileSystem.get(conf);
@@ -447,13 +494,377 @@ public class TestCombineFileInputFormat 
     FSDataOutputStream stm = fileSys.create(name, true,
                                             conf.getInt("io.file.buffer.size", 4096),
                                             replication, (long)BLOCKSIZE);
+    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+  }
+
+  // Creates the gzip file and return the FileStatus
+  static FileStatus writeGzipFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+    return fileSys.getFileStatus(name);
+  }
+
+  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+      OutputStream out, short replication, int numBlocks) throws IOException {
     for (int i = 0; i < numBlocks; i++) {
-      stm.write(databuf);
+      out.write(databuf);
     }
-    stm.close();
+    out.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
   
+  public void testSplitPlacementForCompressedFiles() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+       *  files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1.gz");
+      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5.gz");
+      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      JobConf job = new JobConf(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      InputSplit[] splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test0): " + splits.length);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.length, 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f5.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+      
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2.gz");
+      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test1): " + splits.length);
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(2, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3.gz");
+      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4.gz");
+      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(4, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[3];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is twice file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(2 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test5): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is 4 times file1's length 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(2 * f1.getLen());
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test6): " + split);
+      }
+      assertEquals(2, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(f2.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size and min-split-size per rack is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      inFormat.setMinSplitSizeRack(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test7): " + split);
+      }
+      assertEquals(1, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // minimum split size per node is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(1, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(job, new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test9): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      // measure performance when there are multiple pools and
+      // many files in each pool.
+      int numPools = 100;
+      int numFiles = 1000;
+      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+      for (int i = 0; i < numFiles; i++) {
+        FileInputFormat.setInputPaths(job, file1);
+      }
+      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+      final Path dirNoMatch2 = new Path(inDir, "/diryy");
+      for (int i = 0; i < numPools; i++) {
+        inFormat1.createPool(job, new TestFilter(dirNoMatch1), 
+                            new TestFilter(dirNoMatch2));
+      }
+      long start = System.currentTimeMillis();
+      splits = inFormat1.getSplits(job, 1);
+      long end = System.currentTimeMillis();
+      System.out.println("Elapsed time for " + numPools + " pools " +
+                         " and " + numFiles + " files is " + 
+                         ((end - start)) + " milli seconds.");
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+  
   /**
    * Test when input files are from non-default file systems
    */

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -48,9 +48,6 @@ public class TestLineInputFormat extends
     JobConf job = new JobConf();
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
@@ -58,7 +55,8 @@ public class TestLineInputFormat extends
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH/10) + 1) {
+         length += 1) {
+      System.out.println("Processing file of length "+length);
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
@@ -69,14 +67,21 @@ public class TestLineInputFormat extends
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % numLinesPerMap;
+        if (lastN == 0) {
+          lastN = numLinesPerMap;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
   // A reporter that does nothing
   private static final Reporter voidReporter = Reporter.NULL;
   
-  void checkFormat(JobConf job, int expectedN) throws IOException{
+  void checkFormat(JobConf job, int expectedN, int lastN) throws IOException{
     NLineInputFormat format = new NLineInputFormat();
     format.configure(job);
     int ignoredNumSplits = 1;
@@ -84,7 +89,8 @@ public class TestLineInputFormat extends
 
     // check all splits except last one
     int count = 0;
-    for (int j = 0; j < splits.length -1; j++) {
+    for (int j = 0; j < splits.length; j++) {
+      System.out.println("Processing split "+splits[j]);
       assertEquals("There are no split locations", 0,
                    splits[j].getLocations().length);
       RecordReader<LongWritable, Text> reader =
@@ -102,16 +108,22 @@ public class TestLineInputFormat extends
       try {
         count = 0;
         while (reader.next(key, value)) {
+          System.out.println("Got "+key+" "+value+" at count "+count+" of split "+j);
           count++;
         }
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( j == splits.length - 1) {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }
-  
+
   public static void main(String[] args) throws Exception {
     new TestLineInputFormat().testFormat();
   }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred.lib.db;
 
 import org.apache.hadoop.io.NullWritable;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jun 21 06:37:27 2013
@@ -127,7 +127,7 @@ public class MapReduceTestUtil {
   public static Job createCopyJob(Configuration conf, Path outdir, 
       Path... indirs) throws Exception {
     conf.setInt("mapred.map.tasks", 3);
-    Job theJob = new Job(conf);
+    Job theJob = Job.getInstance(conf);
     theJob.setJobName("DataMoveJob");
 
     FileInputFormat.setInputPaths(theJob, indirs);
@@ -157,7 +157,7 @@ public class MapReduceTestUtil {
       fs.delete(outdir, true);
     }
     conf.setInt("mapred.map.max.attempts", 2);
-    Job theJob = new Job(conf);
+    Job theJob = Job.getInstance(conf);
     theJob.setJobName("Fail-Job");
 
     FileInputFormat.setInputPaths(theJob, indirs);
@@ -182,7 +182,7 @@ public class MapReduceTestUtil {
   public static Job createKillJob(Configuration conf, Path outdir, 
       Path... indirs) throws Exception {
 
-    Job theJob = new Job(conf);
+    Job theJob = Job.getInstance(conf);
     theJob.setJobName("Kill-Job");
 
     FileInputFormat.setInputPaths(theJob, indirs);
@@ -251,7 +251,7 @@ public class MapReduceTestUtil {
 
   public static Job createJob(Configuration conf, Path inDir, Path outDir, 
       int numInputFiles, int numReds, String input) throws IOException {
-    Job job = new Job(conf);
+    Job job = Job.getInstance(conf);
     FileSystem fs = FileSystem.get(conf);
     if (fs.exists(outDir)) {
       fs.delete(outDir, true);

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+
+/**
+ * Stress tests for the LocalJobRunner
+ */
+public class TestLocalRunner extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
+
+  private static class StressMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    // Different map tasks operate at different speeds.
+    // We define behavior for 6 threads.
+    private int threadId;
+
+    // Used to ensure that the compiler doesn't optimize away
+    // some code.
+    public long exposedState;
+
+    protected void setup(Context context) {
+      // Get the thread num from the file number.
+      FileSplit split = (FileSplit) context.getInputSplit();
+      Path filePath = split.getPath();
+      String name = filePath.getName();
+      this.threadId = Integer.valueOf(name);
+
+      LOG.info("Thread " + threadId + " : "
+          + context.getInputSplit());
+    }
+
+    /** Map method with different behavior based on the thread id */
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+
+      switch(threadId) {
+      case 0:
+        // Write a single value and be done.
+        c.write(new LongWritable(0), val);
+        break;
+      case 1:
+      case 2:
+        // Write many values quickly.
+        for (int i = 0; i < 500; i++) {
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      case 3:
+        // Write many values, using thread sleeps to delay this.
+        for (int i = 0; i < 50; i++) {
+          for (int j = 0; j < 10; j++) {
+            c.write(new LongWritable(0), val);
+          }
+          Thread.sleep(1);
+        }
+        break;
+      case 4:
+        // Write many values, using busy-loops to delay this.
+        for (int i = 0; i < 500; i++) {
+          for (int j = 0; j < 10000; j++) {
+            this.exposedState++;
+          }
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      case 5:
+        // Write many values, using very slow busy-loops to delay this.
+        for (int i = 0; i < 500; i++) {
+          for (int j = 0; j < 100000; j++) {
+            this.exposedState++;
+          }
+          c.write(new LongWritable(0), val);
+        }
+        break;
+      default:
+        // Write a single value and be done.
+        c.write(new LongWritable(0), val);
+        break;
+      }
+    }
+
+    protected void cleanup(Context context) {
+      // Output this here, to ensure that the incrementing done in map()
+      // cannot be optimized away.
+      LOG.debug("Busy loop counter: " + this.exposedState);
+    }
+  }
+
+  private static class CountingReducer
+      extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+    public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+        throws IOException, InterruptedException {
+      long out = 0;
+      for (Text val : vals) {
+        out++;
+      }
+
+      context.write(key, new LongWritable(out));
+    }
+  }
+
+  /**
+   * Create a single input file in the input directory.
+   * @param dirPath the directory in which the file resides
+   * @param id the file id number
+   * @param numRecords how many records to write to each file.
+   */
+  private void createInputFile(Path dirPath, int id, int numRecords)
+      throws IOException {
+    final String MESSAGE = "This is a line in a file: ";
+
+    Path filePath = new Path(dirPath, "" + id);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    OutputStream os = fs.create(filePath);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+    for (int i = 0; i < numRecords; i++) {
+      w.write(MESSAGE + id + " " + i + "\n");
+    }
+
+    w.close();
+  }
+
+  // This is the total number of map output records we expect to generate,
+  // based on input file sizes (see createMultiMapsInput()) and the behavior
+  // of the different StressMapper threads.
+  private final static int TOTAL_RECORDS = 50000
+      + (500 * 500)
+      + (500 * 500)
+      + (20 * 500)
+      + (5000 * 500)
+      + (500 * 500);
+
+  private final String INPUT_DIR = "multiMapInput";
+  private final String OUTPUT_DIR = "multiMapOutput";
+
+  private Path getInputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(INPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), INPUT_DIR);
+    }
+  }
+
+  private Path getOutputPath() {
+    String dataDir = System.getProperty("test.build.data");
+    if (null == dataDir) {
+      return new Path(OUTPUT_DIR);
+    } else {
+      return new Path(new Path(dataDir), OUTPUT_DIR);
+    }
+  }
+
+  /**
+   * Create the inputs for the MultiMaps test.
+   * @return the path to the input directory.
+   */
+  private Path createMultiMapsInput() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path inputPath = getInputPath();
+
+    // Clear the input directory if it exists, first.
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create input files, with sizes calibrated based on
+    // the amount of work done in each mapper.
+    createInputFile(inputPath, 0, 50000);
+    createInputFile(inputPath, 1, 500);
+    createInputFile(inputPath, 2, 500);
+    createInputFile(inputPath, 3, 20);
+    createInputFile(inputPath, 4, 5000);
+    createInputFile(inputPath, 5, 500);
+
+    return inputPath;
+  }
+
+  /**
+   * Verify that we got the correct amount of output.
+   */
+  private void verifyOutput(Path outputPath) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    Path outputFile = new Path(outputPath, "part-r-00000");
+    InputStream is = fs.open(outputFile);
+    BufferedReader r = new BufferedReader(new InputStreamReader(is));
+
+    // Should get a single line of the form "0\t(count)"
+    String line = r.readLine().trim();
+    assertTrue("Line does not have correct key", line.startsWith("0\t"));
+    int count = Integer.valueOf(line.substring(2));
+    assertEquals("Incorrect count generated!", TOTAL_RECORDS, count);
+
+    r.close();
+
+  }
+
+  /**
+   * Run a test with several mappers in parallel, operating at different
+   * speeds. Verify that the correct amount of output is created.
+   */
+  @Test
+  public void testMultiMaps() throws Exception {
+    Job job = new Job();
+
+    Path inputPath = createMultiMapsInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(StressMapper.class);
+    job.setReducerClass(CountingReducer.class);
+    job.setNumReduceTasks(1);
+    LocalJobRunner.setLocalMaxRunningMaps(job, 6);
+    job.getConfiguration().set("io.sort.record.pct", "0.50");
+    job.getConfiguration().set("io.sort.mb", "25");
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.waitForCompletion(true);
+
+    verifyOutput(outputPath);
+  }
+
+  /**
+   * Run a test with a misconfigured number of mappers.
+   * Expect failure.
+   */
+  @Test
+  public void testInvalidMultiMapParallelism() throws Exception {
+    Job job = new Job();
+
+    Path inputPath = createMultiMapsInput();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setMapperClass(StressMapper.class);
+    job.setReducerClass(CountingReducer.class);
+    job.setNumReduceTasks(1);
+    LocalJobRunner.setLocalMaxRunningMaps(job, -6);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    boolean success = job.waitForCompletion(true);
+    assertFalse("Job succeeded somehow", success);
+  }
+
+  /** An IF that creates no splits */
+  private static class EmptyInputFormat extends InputFormat<Object, Object> {
+    public List<InputSplit> getSplits(JobContext context) {
+      return new ArrayList<InputSplit>();
+    }
+
+    public RecordReader<Object, Object> createRecordReader(InputSplit split,
+        TaskAttemptContext context) {
+      return new EmptyRecordReader();
+    }
+  }
+
+  private static class EmptyRecordReader extends RecordReader<Object, Object> {
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+    }
+
+    public Object getCurrentKey() {
+      return new Object();
+    }
+
+    public Object getCurrentValue() {
+      return new Object();
+    }
+
+    public float getProgress() {
+      return 0.0f;
+    }
+
+    public void close() {
+    }
+
+    public boolean nextKeyValue() {
+      return false;
+    }
+  }
+
+  /** Test case for zero mappers */
+  public void testEmptyMaps() throws Exception {
+    Job job = new Job();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    job.setInputFormatClass(EmptyInputFormat.class);
+    job.setNumReduceTasks(1);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    boolean success = job.waitForCompletion(true);
+    assertTrue("Empty job should work", success);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapreduce;
 
 import java.io.DataInput;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Fri Jun 21 06:37:27 2013
@@ -97,7 +97,7 @@ public class TestMapReduceLocal extends 
       private float last = 0.0f;
       private boolean progressCalled = false;
       @Override
-      public float getProgress() {
+      public float getProgress() throws IOException {
         progressCalled = true;
         final float ret = super.getProgress();
         assertTrue("getProgress decreased", ret >= last);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -1130,7 +1130,7 @@ public class TestCombineFileInputFormat 
     dos.writeChars("Local file for CFIF");
     dos.close();
 
-    Job job = new Job(conf);
+    Job job = Job.getInstance(conf);
     FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
     DummyInputFormat inFormat = new DummyInputFormat();
     List<InputSplit> splits = inFormat.getSplits(job);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -211,7 +211,7 @@ public class TestKeyValueTextInputFormat
    * Test using the gzip codec for reading
    */
   public static void testGzip() throws Exception {
-    Job job = new Job();
+    Job job = Job.getInstance();
     CompressionCodec gzip = new GzipCodec();
     ReflectionUtils.setConf(gzip, job.getConfiguration());
     localFs.delete(workDir, true);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -49,37 +49,40 @@ public class TestNLineInputFormat extend
     Job job = new Job(conf);
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
     NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
-    // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+         length += 1) {
+ 
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
         for (int i = 0; i < length; i++) {
-          writer.write(Integer.toString(i));
+          writer.write(Integer.toString(i)+" some more text");
           writer.write("\n");
         }
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % 5;
+        if (lastN == 0) {
+          lastN = 5;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
-  void checkFormat(Job job, int expectedN) 
+  void checkFormat(Job job, int expectedN, int lastN) 
       throws IOException, InterruptedException {
     NLineInputFormat format = new NLineInputFormat();
     List<InputSplit> splits = format.getSplits(job);
-    // check all splits except last one
     int count = 0;
-    for (int i = 0; i < splits.size() -1; i++) {
+    for (int i = 0; i < splits.size(); i++) {
       assertEquals("There are no split locations", 0,
                    splits.get(i).getLocations().length);
       TaskAttemptContext context = MapReduceTestUtil.
@@ -103,8 +106,13 @@ public class TestNLineInputFormat extend
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( i == splits.size() - 1) {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + i + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }
   

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Fri Jun 21 06:37:27 2013
@@ -173,7 +173,8 @@ public class TestMRSequenceFileAsBinaryO
 
   public void testcheckOutputSpecsForbidRecordCompression() 
       throws IOException {
-    Job job = new Job();
+    Job job = Job.getInstance(new Configuration(),
+        "testcheckOutputSpecsForbidRecordCompression");
     FileSystem fs = FileSystem.getLocal(job.getConfiguration());
     Path outputdir = new Path(System.getProperty("test.build.data", "/tmp") 
                               + "/output");

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Fri Jun 21 06:37:27 2013
@@ -136,7 +136,7 @@ public class TestInputSampler {
     for (int i = 0; i < TOT_SPLITS; ++i) {
       inits[i] = i;
     }
-    Job ignored = new Job();
+    Job ignored = Job.getInstance();
     Object[] samples = sampler.getSample(new TestInputSamplerIF(
           NUM_SAMPLES, TOT_SPLITS, inits), ignored);
     assertEquals(NUM_SAMPLES, samples.length);

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jun 21 06:37:27 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.secu
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -311,4 +312,15 @@ public class TestTokenCache {
     TokenCache.obtainTokensForNamenodesInternal(
       credentials, new Path [] {finalPath}, jConf);
   }
+
+
+  @Test
+  public void testCleanUpTokenReferral() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(TokenCache.MAPREDUCE_JOB_CREDENTIALS_BINARY, "foo");
+    TokenCache.cleanUpTokenReferral(conf);
+    assertNull(conf.get(TokenCache.MAPREDUCE_JOB_CREDENTIALS_BINARY));
+  }
+
+
 }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Jun 21 06:37:27 2013
@@ -65,6 +65,12 @@ public class TestDelegationTokenRenewal 
     private static Token<?> lastRenewed = null;
     private static Token<?> tokenToRenewIn2Sec = null;
 
+    private static void reset() {
+      counter = 0;
+      lastRenewed = null;
+      tokenToRenewIn2Sec = null;
+    }
+    
     @Override
     public boolean handleKind(Text kind) {
       return KIND.equals(kind);
@@ -109,6 +115,7 @@ public class TestDelegationTokenRenewal 
  
   @BeforeClass
   public static void setUp() throws Exception {
+    Renewer.reset();
     conf = new Configuration();
     conf.set("mapred.job.tracker", trackerService);
     
@@ -354,4 +361,19 @@ public class TestDelegationTokenRenewal 
       //expected
     }
   }
+
+  /**
+   * Run the testDTRenewal(), close the DelegationTokenRenewal, and run the
+   * testDTRenewal() test again to make sure that DelegationTokenRenewal can be
+   * re-used after its been closed
+   * @throws Exception
+   */
+  @Test
+  public void testDTRenewalAfterClose() throws Exception {
+      Renewer.counter = 0;
+      testDTRenewal();
+      DelegationTokenRenewal.close();
+      Renewer.counter = 0;
+      testDTRenewal();
+  }
 }