You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/24 13:19:26 UTC

svn commit: r1126981 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/s...

Author: vinodkv
Date: Tue May 24 11:19:25 2011
New Revision: 1126981

URL: http://svn.apache.org/viewvc?rev=1126981&view=rev
Log:
Fix distributed-cache related bugs. (vinodkv)

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java
Removed:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
    hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java
    hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 24 11:19:25 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Fix distributed-cache related bugs. (vinodkv)
+
     Fix for regression on the scheduling of reduces before maps are done (ddas)
 
     Fix NPE in test case (mahadev)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue May 24 11:19:25 2011
@@ -25,20 +25,24 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -245,6 +249,10 @@ class YarnChild {
     // setup the child's attempt directories
     // Do the task-type specific localization
     task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
     // Overwrite the localized task jobconf which is linked to in the current
     // work-dir.
     Path localTaskFile = new Path(Constants.JOBFILE);
@@ -254,6 +262,62 @@ class YarnChild {
     return job;
   }
 
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
   private static final FsPermission urw_gr =
     FsPermission.createImmutable((short) 0640);
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue May 24 11:19:25 2011
@@ -145,8 +145,8 @@ public abstract class TaskAttemptImpl im
   private final Lock writeLock;
   private Collection<Token<? extends TokenIdentifier>> fsTokens;
   private Token<JobTokenIdentifier> jobToken;
-  private static AtomicBoolean initialEnvFlag = new AtomicBoolean();
-  private static Map<String, String> initialEnv = null;
+  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+  private static String initialClasspath = null;
 
   private long launchTime;
   private long finishTime;
@@ -455,19 +455,20 @@ public abstract class TaskAttemptImpl im
   }
 
   /**
-   * Lock this on initialEnv so that there is only one fork in the AM for
+   * Lock this on initialClasspath so that there is only one fork in the AM for
    * getting the initial class-path. TODO: This should go away once we construct
    * a parent CLC and use it for all the containers.
    */
-  private Map<String, String> getInitialEnv() throws IOException {
-    synchronized (initialEnvFlag) {
-      if (initialEnvFlag.get()) {
-        return initialEnv;
+  private String getInitialClasspath() throws IOException {
+    synchronized (initialClasspathFlag) {
+      if (initialClasspathFlag.get()) {
+        return initialClasspath;
       }
-      initialEnv = new HashMap<String, String>();
-      MRApps.setInitialClasspath(initialEnv);
-      initialEnvFlag.set(true);
-      return initialEnv;
+      Map<String, String> env = new HashMap<String, String>();
+      MRApps.setInitialClasspath(env);
+      initialClasspath = env.get(MRApps.CLASSPATH);
+      initialClasspathFlag.set(true);
+      return initialClasspath;
     }
   }
 
@@ -546,7 +547,7 @@ public abstract class TaskAttemptImpl im
               ByteBuffer.wrap(jobToken_dob.getData(), 0,
                   jobToken_dob.getLength()));
 
-      container.addAllEnv(getInitialEnv());
+      MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
     } catch (IOException e) {
       throw new YarnException(e);
     }
@@ -612,7 +613,7 @@ public abstract class TaskAttemptImpl im
   private void parseDistributedCacheArtifacts(
       FileContext remoteFS, ContainerLaunchContext container, LocalResourceType type,
       URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] classpaths) throws IOException {
+      Path[] pathsToPutOnClasspath) throws IOException {
 
     if (uris != null) {
       // Sanity check
@@ -627,18 +628,18 @@ public abstract class TaskAttemptImpl im
       }
       
       Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (classpaths != null) {
-        for (Path p : classpaths) {
+      if (pathsToPutOnClasspath != null) {
+        for (Path p : pathsToPutOnClasspath) {
+          p = p.makeQualified(remoteFS.getDefaultFileSystem()
+                .getUri(), remoteFS.getWorkingDirectory());
           classPaths.put(p.toUri().getPath().toString(), p);
         }
       }
       for (int i = 0; i < uris.length; ++i) {
         URI u = uris[i];
         Path p = new Path(u);
-        if (!p.isAbsolute()) {
-          p = p.makeQualified(remoteFS.getDefaultFileSystem()
+        p = p.makeQualified(remoteFS.getDefaultFileSystem()
               .getUri(), remoteFS.getWorkingDirectory());
-        }
         // Add URI fragment or just the filename
         Path name = new Path((null == u.getFragment())
           ? p.getName()
@@ -646,8 +647,9 @@ public abstract class TaskAttemptImpl im
         if (name.isAbsolute()) {
           throw new IllegalArgumentException("Resource name must be relative");
         }
+        String linkName = name.toUri().getPath();
         container.setLocalResource(
-            name.toUri().getPath(),
+            linkName,
             BuilderUtils.newLocalResource(recordFactory,
                 p.toUri(), type, 
                 visibilities[i]
@@ -656,23 +658,12 @@ public abstract class TaskAttemptImpl im
                 sizes[i], timestamps[i])
         );
         if (classPaths.containsKey(u.getPath())) {
-          addCacheArtifactToClassPath(container, name.toUri().getPath());
+          Map<String, String> environment = container.getAllEnv();
+          MRApps.addToClassPath(environment, linkName);
         }
       }
     }
   }
-
-  private static final String CLASSPATH = "CLASSPATH";
-  private static void addCacheArtifactToClassPath(
-      ContainerLaunchContext container, String fileName) {
-    String classpath = container.getEnv(CLASSPATH);
-    if (classpath == null) {
-      classpath = fileName;
-    } else {
-      classpath = classpath + ":" + fileName;
-    }
-    container.setEnv(CLASSPATH, classpath);
-  }
   
   // TODO - Move this to MR!
   private static long[] getFileSizes(Configuration conf, String key) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml Tue May 24 11:19:25 2011
@@ -65,4 +65,29 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+       <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-surefire-plugin</artifactId>
+           <configuration>
+             <additionalClasspathElements>
+               <additionalClasspathElement>${basedir}/target/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT-tests.jar</additionalClasspathElement>
+             </additionalClasspathElements>
+           </configuration>
+        </plugin>
+    </plugins>
+  </build>
 </project>

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue May 24 11:19:25 2011
@@ -73,7 +73,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -227,7 +226,7 @@ public class YARNRunner implements Clien
 
     // Construct necessary information to start the MR AM
     ApplicationSubmissionContext appContext = 
-      getApplicationSubmissionContext(conf, jobSubmitDir, ts);
+      createApplicationSubmissionContext(conf, jobSubmitDir, ts);
     setupDistributedCache(conf, appContext);
     
     // XXX Remove
@@ -258,7 +257,7 @@ public class YARNRunner implements Clien
     return rsrc;
   }
 
-  private ApplicationSubmissionContext getApplicationSubmissionContext(
+  private ApplicationSubmissionContext createApplicationSubmissionContext(
       Configuration jobConf,
       String jobSubmitDir, Credentials ts) throws IOException {
     ApplicationSubmissionContext appContext =
@@ -357,9 +356,14 @@ public class YARNRunner implements Clien
     appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
     return appContext;
   }
-  
+
   /**
    * TODO: Copied for now from TaskAttemptImpl.java ... fixme
+   * 
+   * TODO: This is currently needed in YarnRunner as user code like setupJob,
+   * cleanupJob may need access to dist-cache. Once we separate distcache for
+   * maps, reduces, setup etc, this can include only a subset of artificats.
+   * This is also needed for uberAM case where we run everything inside AM.
    */
   private void setupDistributedCache(Configuration conf, 
       ApplicationSubmissionContext container) throws IOException {
@@ -386,7 +390,7 @@ public class YARNRunner implements Clien
   private void parseDistributedCacheArtifacts(
       ApplicationSubmissionContext container, LocalResourceType type,
       URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
-      Path[] classpaths) throws IOException {
+      Path[] pathsToPutOnClasspath) {
 
     if (uris != null) {
       // Sanity check
@@ -401,18 +405,18 @@ public class YARNRunner implements Clien
       }
       
       Map<String, Path> classPaths = new HashMap<String, Path>();
-      if (classpaths != null) {
-        for (Path p : classpaths) {
+      if (pathsToPutOnClasspath != null) {
+        for (Path p : pathsToPutOnClasspath) {
+          p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+                .getUri(), this.defaultFileContext.getWorkingDirectory());
           classPaths.put(p.toUri().getPath().toString(), p);
         }
       }
       for (int i = 0; i < uris.length; ++i) {
         URI u = uris[i];
         Path p = new Path(u);
-        if (!p.isAbsolute()) {
-          p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+        p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
               .getUri(), this.defaultFileContext.getWorkingDirectory());
-        }
         // Add URI fragment or just the filename
         Path name = new Path((null == u.getFragment())
           ? p.getName()
@@ -420,8 +424,9 @@ public class YARNRunner implements Clien
         if (name.isAbsolute()) {
           throw new IllegalArgumentException("Resource name must be relative");
         }
+        String linkName = name.toUri().getPath();
         container.setResourceTodo(
-            name.toUri().getPath(),
+            linkName,
             createLocalResource(
                 p.toUri(), type, 
                 visibilities[i]
@@ -431,8 +436,7 @@ public class YARNRunner implements Clien
         );
         if (classPaths.containsKey(u.getPath())) {
           Map<String, String> environment = container.getAllEnvironment();
-          MRApps.addToClassPath(environment, name.toUri().getPath());
-          container.addAllEnvironment(environment);
+          MRApps.addToClassPath(environment, linkName);
         }
       }
     }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java Tue May 24 11:19:25 2011
@@ -0,0 +1,23 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// Mapper that fails
+public class FailMapper extends MapReduceBase implements
+    Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+  public void map(WritableComparable key, Writable value,
+      OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+      throws IOException {
+    // NOTE- the next line is required for the TestDebugScript test to succeed
+    System.err.println("failing map");
+    throw new RuntimeException("failing map");
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java Tue May 24 11:19:25 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Fails the Mapper. First attempt throws exception. Rest do System.exit.
+ *
+ */
+public class FailingMapper extends Mapper<Text, Text, Text, Text> {
+  public void map(Text key, Text value,
+      Context context) throws IOException,InterruptedException {
+    if (context.getTaskAttemptID().getId() == 0) {
+      System.out.println("Attempt:" + context.getTaskAttemptID() + 
+        " Failing mapper throwing exception");
+      throw new IOException("Attempt:" + context.getTaskAttemptID() + 
+          " Failing mapper throwing exception");
+    } else {
+      System.out.println("Attempt:" + context.getTaskAttemptID() + 
+      " Exiting");
+      System.exit(-1);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java Tue May 24 11:19:25 2011
@@ -0,0 +1,758 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class RandomTextWriterJob extends Configured implements Tool {
+
+  public static final String TOTAL_BYTES = 
+    "mapreduce.randomtextwriter.totalbytes";
+  public static final String BYTES_PER_MAP = 
+    "mapreduce.randomtextwriter.bytespermap";
+  public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+  public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+  public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+  public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
+
+  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+
+  public Job createJob(Configuration conf) throws IOException {
+    long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 10 * 1024);
+    long totalBytesToWrite = conf.getLong(TOTAL_BYTES, numBytesToWritePerMap);
+    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
+    if (numMaps == 0 && totalBytesToWrite > 0) {
+      numMaps = 1;
+      conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
+    }
+    conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
+
+    Job job = new Job(conf);
+
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setJobName("random-text-writer");
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setMapperClass(RandomTextMapper.class);
+
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    //FileOutputFormat.setOutputPath(job, new Path("random-output"));
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  public static class RandomInputFormat extends InputFormat<Text, Text> {
+
+    /** 
+     * Generate the requested number of file splits, with the filename
+     * set to the filename of the output file.
+     */
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> result = new ArrayList<InputSplit>();
+      Path outDir = FileOutputFormat.getOutputPath(job);
+      int numSplits = 
+            job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+      for(int i=0; i < numSplits; ++i) {
+        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  (String[])null));
+      }
+      return result;
+    }
+
+    /**
+     * Return a single record (filename, "") where the filename is taken from
+     * the file split.
+     */
+    public static class RandomRecordReader extends RecordReader<Text, Text> {
+      Path name;
+      Text key = null;
+      Text value = new Text();
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      
+      public void initialize(InputSplit split,
+                             TaskAttemptContext context)
+      throws IOException, InterruptedException {
+        
+      }
+      
+      public boolean nextKeyValue() {
+        if (name != null) {
+          key = new Text();
+          key.set(name.getName());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      
+      public Text getCurrentKey() {
+        return key;
+      }
+      
+      public Text getCurrentValue() {
+        return value;
+      }
+      
+      public void close() {}
+
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader<Text, Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  public static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
+    
+    private long numBytesToWrite;
+    private int minWordsInKey;
+    private int wordsInKeyRange;
+    private int minWordsInValue;
+    private int wordsInValueRange;
+    private Random random = new Random();
+    
+    /**
+     * Save the configuration value that we need to write the data.
+     */
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong(BYTES_PER_MAP,
+                                    1*1024*1024*1024);
+      minWordsInKey = conf.getInt(MIN_KEY, 5);
+      wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
+      minWordsInValue = conf.getInt(MIN_VALUE, 10);
+      wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
+    }
+    
+    /**
+     * Given an output filename, write a bunch of random records to it.
+     */
+    public void map(Text key, Text value,
+                    Context context) throws IOException,InterruptedException {
+      int itemCount = 0;
+      while (numBytesToWrite > 0) {
+        // Generate the key/value 
+        int noWordsKey = minWordsInKey + 
+          (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
+        int noWordsValue = minWordsInValue + 
+          (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
+        Text keyWords = generateSentence(noWordsKey);
+        Text valueWords = generateSentence(noWordsValue);
+        
+        // Write the sentence 
+        context.write(keyWords, valueWords);
+        
+        numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
+        
+        // Update counters, progress etc.
+        context.getCounter(Counters.BYTES_WRITTEN).increment(
+                  keyWords.getLength() + valueWords.getLength());
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+        if (++itemCount % 200 == 0) {
+          context.setStatus("wrote record " + itemCount + ". " + 
+                             numBytesToWrite + " bytes left.");
+        }
+      }
+      context.setStatus("done with " + itemCount + " records.");
+    }
+    
+    private Text generateSentence(int noWords) {
+      StringBuffer sentence = new StringBuffer();
+      String space = " ";
+      for (int i=0; i < noWords; ++i) {
+        sentence.append(words[random.nextInt(words.length)]);
+        sentence.append(space);
+      }
+      return new Text(sentence.toString());
+    }
+
+    private static String[] words = {
+      "diurnalness", "Homoiousian",
+      "spiranthic", "tetragynian",
+      "silverhead", "ungreat",
+      "lithograph", "exploiter",
+      "physiologian", "by",
+      "hellbender", "Filipendula",
+      "undeterring", "antiscolic",
+      "pentagamist", "hypoid",
+      "cacuminal", "sertularian",
+      "schoolmasterism", "nonuple",
+      "gallybeggar", "phytonic",
+      "swearingly", "nebular",
+      "Confervales", "thermochemically",
+      "characinoid", "cocksuredom",
+      "fallacious", "feasibleness",
+      "debromination", "playfellowship",
+      "tramplike", "testa",
+      "participatingly", "unaccessible",
+      "bromate", "experientialist",
+      "roughcast", "docimastical",
+      "choralcelo", "blightbird",
+      "peptonate", "sombreroed",
+      "unschematized", "antiabolitionist",
+      "besagne", "mastication",
+      "bromic", "sviatonosite",
+      "cattimandoo", "metaphrastical",
+      "endotheliomyoma", "hysterolysis",
+      "unfulminated", "Hester",
+      "oblongly", "blurredness",
+      "authorling", "chasmy",
+      "Scorpaenidae", "toxihaemia",
+      "Dictograph", "Quakerishly",
+      "deaf", "timbermonger",
+      "strammel", "Thraupidae",
+      "seditious", "plerome",
+      "Arneb", "eristically",
+      "serpentinic", "glaumrie",
+      "socioromantic", "apocalypst",
+      "tartrous", "Bassaris",
+      "angiolymphoma", "horsefly",
+      "kenno", "astronomize",
+      "euphemious", "arsenide",
+      "untongued", "parabolicness",
+      "uvanite", "helpless",
+      "gemmeous", "stormy",
+      "templar", "erythrodextrin",
+      "comism", "interfraternal",
+      "preparative", "parastas",
+      "frontoorbital", "Ophiosaurus",
+      "diopside", "serosanguineous",
+      "ununiformly", "karyological",
+      "collegian", "allotropic",
+      "depravity", "amylogenesis",
+      "reformatory", "epidymides",
+      "pleurotropous", "trillium",
+      "dastardliness", "coadvice",
+      "embryotic", "benthonic",
+      "pomiferous", "figureheadship",
+      "Megaluridae", "Harpa",
+      "frenal", "commotion",
+      "abthainry", "cobeliever",
+      "manilla", "spiciferous",
+      "nativeness", "obispo",
+      "monilioid", "biopsic",
+      "valvula", "enterostomy",
+      "planosubulate", "pterostigma",
+      "lifter", "triradiated",
+      "venialness", "tum",
+      "archistome", "tautness",
+      "unswanlike", "antivenin",
+      "Lentibulariaceae", "Triphora",
+      "angiopathy", "anta",
+      "Dawsonia", "becomma",
+      "Yannigan", "winterproof",
+      "antalgol", "harr",
+      "underogating", "ineunt",
+      "cornberry", "flippantness",
+      "scyphostoma", "approbation",
+      "Ghent", "Macraucheniidae",
+      "scabbiness", "unanatomized",
+      "photoelasticity", "eurythermal",
+      "enation", "prepavement",
+      "flushgate", "subsequentially",
+      "Edo", "antihero",
+      "Isokontae", "unforkedness",
+      "porriginous", "daytime",
+      "nonexecutive", "trisilicic",
+      "morphiomania", "paranephros",
+      "botchedly", "impugnation",
+      "Dodecatheon", "obolus",
+      "unburnt", "provedore",
+      "Aktistetae", "superindifference",
+      "Alethea", "Joachimite",
+      "cyanophilous", "chorograph",
+      "brooky", "figured",
+      "periclitation", "quintette",
+      "hondo", "ornithodelphous",
+      "unefficient", "pondside",
+      "bogydom", "laurinoxylon",
+      "Shiah", "unharmed",
+      "cartful", "noncrystallized",
+      "abusiveness", "cromlech",
+      "japanned", "rizzomed",
+      "underskin", "adscendent",
+      "allectory", "gelatinousness",
+      "volcano", "uncompromisingly",
+      "cubit", "idiotize",
+      "unfurbelowed", "undinted",
+      "magnetooptics", "Savitar",
+      "diwata", "ramosopalmate",
+      "Pishquow", "tomorn",
+      "apopenptic", "Haversian",
+      "Hysterocarpus", "ten",
+      "outhue", "Bertat",
+      "mechanist", "asparaginic",
+      "velaric", "tonsure",
+      "bubble", "Pyrales",
+      "regardful", "glyphography",
+      "calabazilla", "shellworker",
+      "stradametrical", "havoc",
+      "theologicopolitical", "sawdust",
+      "diatomaceous", "jajman",
+      "temporomastoid", "Serrifera",
+      "Ochnaceae", "aspersor",
+      "trailmaking", "Bishareen",
+      "digitule", "octogynous",
+      "epididymitis", "smokefarthings",
+      "bacillite", "overcrown",
+      "mangonism", "sirrah",
+      "undecorated", "psychofugal",
+      "bismuthiferous", "rechar",
+      "Lemuridae", "frameable",
+      "thiodiazole", "Scanic",
+      "sportswomanship", "interruptedness",
+      "admissory", "osteopaedion",
+      "tingly", "tomorrowness",
+      "ethnocracy", "trabecular",
+      "vitally", "fossilism",
+      "adz", "metopon",
+      "prefatorial", "expiscate",
+      "diathermacy", "chronist",
+      "nigh", "generalizable",
+      "hysterogen", "aurothiosulphuric",
+      "whitlowwort", "downthrust",
+      "Protestantize", "monander",
+      "Itea", "chronographic",
+      "silicize", "Dunlop",
+      "eer", "componental",
+      "spot", "pamphlet",
+      "antineuritic", "paradisean",
+      "interruptor", "debellator",
+      "overcultured", "Florissant",
+      "hyocholic", "pneumatotherapy",
+      "tailoress", "rave",
+      "unpeople", "Sebastian",
+      "thermanesthesia", "Coniferae",
+      "swacking", "posterishness",
+      "ethmopalatal", "whittle",
+      "analgize", "scabbardless",
+      "naught", "symbiogenetically",
+      "trip", "parodist",
+      "columniform", "trunnel",
+      "yawler", "goodwill",
+      "pseudohalogen", "swangy",
+      "cervisial", "mediateness",
+      "genii", "imprescribable",
+      "pony", "consumptional",
+      "carposporangial", "poleax",
+      "bestill", "subfebrile",
+      "sapphiric", "arrowworm",
+      "qualminess", "ultraobscure",
+      "thorite", "Fouquieria",
+      "Bermudian", "prescriber",
+      "elemicin", "warlike",
+      "semiangle", "rotular",
+      "misthread", "returnability",
+      "seraphism", "precostal",
+      "quarried", "Babylonism",
+      "sangaree", "seelful",
+      "placatory", "pachydermous",
+      "bozal", "galbulus",
+      "spermaphyte", "cumbrousness",
+      "pope", "signifier",
+      "Endomycetaceae", "shallowish",
+      "sequacity", "periarthritis",
+      "bathysphere", "pentosuria",
+      "Dadaism", "spookdom",
+      "Consolamentum", "afterpressure",
+      "mutter", "louse",
+      "ovoviviparous", "corbel",
+      "metastoma", "biventer",
+      "Hydrangea", "hogmace",
+      "seizing", "nonsuppressed",
+      "oratorize", "uncarefully",
+      "benzothiofuran", "penult",
+      "balanocele", "macropterous",
+      "dishpan", "marten",
+      "absvolt", "jirble",
+      "parmelioid", "airfreighter",
+      "acocotl", "archesporial",
+      "hypoplastral", "preoral",
+      "quailberry", "cinque",
+      "terrestrially", "stroking",
+      "limpet", "moodishness",
+      "canicule", "archididascalian",
+      "pompiloid", "overstaid",
+      "introducer", "Italical",
+      "Christianopaganism", "prescriptible",
+      "subofficer", "danseuse",
+      "cloy", "saguran",
+      "frictionlessly", "deindividualization",
+      "Bulanda", "ventricous",
+      "subfoliar", "basto",
+      "scapuloradial", "suspend",
+      "stiffish", "Sphenodontidae",
+      "eternal", "verbid",
+      "mammonish", "upcushion",
+      "barkometer", "concretion",
+      "preagitate", "incomprehensible",
+      "tristich", "visceral",
+      "hemimelus", "patroller",
+      "stentorophonic", "pinulus",
+      "kerykeion", "brutism",
+      "monstership", "merciful",
+      "overinstruct", "defensibly",
+      "bettermost", "splenauxe",
+      "Mormyrus", "unreprimanded",
+      "taver", "ell",
+      "proacquittal", "infestation",
+      "overwoven", "Lincolnlike",
+      "chacona", "Tamil",
+      "classificational", "lebensraum",
+      "reeveland", "intuition",
+      "Whilkut", "focaloid",
+      "Eleusinian", "micromembrane",
+      "byroad", "nonrepetition",
+      "bacterioblast", "brag",
+      "ribaldrous", "phytoma",
+      "counteralliance", "pelvimetry",
+      "pelf", "relaster",
+      "thermoresistant", "aneurism",
+      "molossic", "euphonym",
+      "upswell", "ladhood",
+      "phallaceous", "inertly",
+      "gunshop", "stereotypography",
+      "laryngic", "refasten",
+      "twinling", "oflete",
+      "hepatorrhaphy", "electrotechnics",
+      "cockal", "guitarist",
+      "topsail", "Cimmerianism",
+      "larklike", "Llandovery",
+      "pyrocatechol", "immatchable",
+      "chooser", "metrocratic",
+      "craglike", "quadrennial",
+      "nonpoisonous", "undercolored",
+      "knob", "ultratense",
+      "balladmonger", "slait",
+      "sialadenitis", "bucketer",
+      "magnificently", "unstipulated",
+      "unscourged", "unsupercilious",
+      "packsack", "pansophism",
+      "soorkee", "percent",
+      "subirrigate", "champer",
+      "metapolitics", "spherulitic",
+      "involatile", "metaphonical",
+      "stachyuraceous", "speckedness",
+      "bespin", "proboscidiform",
+      "gul", "squit",
+      "yeelaman", "peristeropode",
+      "opacousness", "shibuichi",
+      "retinize", "yote",
+      "misexposition", "devilwise",
+      "pumpkinification", "vinny",
+      "bonze", "glossing",
+      "decardinalize", "transcortical",
+      "serphoid", "deepmost",
+      "guanajuatite", "wemless",
+      "arval", "lammy",
+      "Effie", "Saponaria",
+      "tetrahedral", "prolificy",
+      "excerpt", "dunkadoo",
+      "Spencerism", "insatiately",
+      "Gilaki", "oratorship",
+      "arduousness", "unbashfulness",
+      "Pithecolobium", "unisexuality",
+      "veterinarian", "detractive",
+      "liquidity", "acidophile",
+      "proauction", "sural",
+      "totaquina", "Vichyite",
+      "uninhabitedness", "allegedly",
+      "Gothish", "manny",
+      "Inger", "flutist",
+      "ticktick", "Ludgatian",
+      "homotransplant", "orthopedical",
+      "diminutively", "monogoneutic",
+      "Kenipsim", "sarcologist",
+      "drome", "stronghearted",
+      "Fameuse", "Swaziland",
+      "alen", "chilblain",
+      "beatable", "agglomeratic",
+      "constitutor", "tendomucoid",
+      "porencephalous", "arteriasis",
+      "boser", "tantivy",
+      "rede", "lineamental",
+      "uncontradictableness", "homeotypical",
+      "masa", "folious",
+      "dosseret", "neurodegenerative",
+      "subtransverse", "Chiasmodontidae",
+      "palaeotheriodont", "unstressedly",
+      "chalcites", "piquantness",
+      "lampyrine", "Aplacentalia",
+      "projecting", "elastivity",
+      "isopelletierin", "bladderwort",
+      "strander", "almud",
+      "iniquitously", "theologal",
+      "bugre", "chargeably",
+      "imperceptivity", "meriquinoidal",
+      "mesophyte", "divinator",
+      "perfunctory", "counterappellant",
+      "synovial", "charioteer",
+      "crystallographical", "comprovincial",
+      "infrastapedial", "pleasurehood",
+      "inventurous", "ultrasystematic",
+      "subangulated", "supraoesophageal",
+      "Vaishnavism", "transude",
+      "chrysochrous", "ungrave",
+      "reconciliable", "uninterpleaded",
+      "erlking", "wherefrom",
+      "aprosopia", "antiadiaphorist",
+      "metoxazine", "incalculable",
+      "umbellic", "predebit",
+      "foursquare", "unimmortal",
+      "nonmanufacture", "slangy",
+      "predisputant", "familist",
+      "preaffiliate", "friarhood",
+      "corelysis", "zoonitic",
+      "halloo", "paunchy",
+      "neuromimesis", "aconitine",
+      "hackneyed", "unfeeble",
+      "cubby", "autoschediastical",
+      "naprapath", "lyrebird",
+      "inexistency", "leucophoenicite",
+      "ferrogoslarite", "reperuse",
+      "uncombable", "tambo",
+      "propodiale", "diplomatize",
+      "Russifier", "clanned",
+      "corona", "michigan",
+      "nonutilitarian", "transcorporeal",
+      "bought", "Cercosporella",
+      "stapedius", "glandularly",
+      "pictorially", "weism",
+      "disilane", "rainproof",
+      "Caphtor", "scrubbed",
+      "oinomancy", "pseudoxanthine",
+      "nonlustrous", "redesertion",
+      "Oryzorictinae", "gala",
+      "Mycogone", "reappreciate",
+      "cyanoguanidine", "seeingness",
+      "breadwinner", "noreast",
+      "furacious", "epauliere",
+      "omniscribent", "Passiflorales",
+      "uninductive", "inductivity",
+      "Orbitolina", "Semecarpus",
+      "migrainoid", "steprelationship",
+      "phlogisticate", "mesymnion",
+      "sloped", "edificator",
+      "beneficent", "culm",
+      "paleornithology", "unurban",
+      "throbless", "amplexifoliate",
+      "sesquiquintile", "sapience",
+      "astucious", "dithery",
+      "boor", "ambitus",
+      "scotching", "uloid",
+      "uncompromisingness", "hoove",
+      "waird", "marshiness",
+      "Jerusalem", "mericarp",
+      "unevoked", "benzoperoxide",
+      "outguess", "pyxie",
+      "hymnic", "euphemize",
+      "mendacity", "erythremia",
+      "rosaniline", "unchatteled",
+      "lienteria", "Bushongo",
+      "dialoguer", "unrepealably",
+      "rivethead", "antideflation",
+      "vinegarish", "manganosiderite",
+      "doubtingness", "ovopyriform",
+      "Cephalodiscus", "Muscicapa",
+      "Animalivora", "angina",
+      "planispheric", "ipomoein",
+      "cuproiodargyrite", "sandbox",
+      "scrat", "Munnopsidae",
+      "shola", "pentafid",
+      "overstudiousness", "times",
+      "nonprofession", "appetible",
+      "valvulotomy", "goladar",
+      "uniarticular", "oxyterpene",
+      "unlapsing", "omega",
+      "trophonema", "seminonflammable",
+      "circumzenithal", "starer",
+      "depthwise", "liberatress",
+      "unleavened", "unrevolting",
+      "groundneedle", "topline",
+      "wandoo", "umangite",
+      "ordinant", "unachievable",
+      "oversand", "snare",
+      "avengeful", "unexplicit",
+      "mustafina", "sonable",
+      "rehabilitative", "eulogization",
+      "papery", "technopsychology",
+      "impressor", "cresylite",
+      "entame", "transudatory",
+      "scotale", "pachydermatoid",
+      "imaginary", "yeat",
+      "slipped", "stewardship",
+      "adatom", "cockstone",
+      "skyshine", "heavenful",
+      "comparability", "exprobratory",
+      "dermorhynchous", "parquet",
+      "cretaceous", "vesperal",
+      "raphis", "undangered",
+      "Glecoma", "engrain",
+      "counteractively", "Zuludom",
+      "orchiocatabasis", "Auriculariales",
+      "warriorwise", "extraorganismal",
+      "overbuilt", "alveolite",
+      "tetchy", "terrificness",
+      "widdle", "unpremonished",
+      "rebilling", "sequestrum",
+      "equiconvex", "heliocentricism",
+      "catabaptist", "okonite",
+      "propheticism", "helminthagogic",
+      "calycular", "giantly",
+      "wingable", "golem",
+      "unprovided", "commandingness",
+      "greave", "haply",
+      "doina", "depressingly",
+      "subdentate", "impairment",
+      "decidable", "neurotrophic",
+      "unpredict", "bicorporeal",
+      "pendulant", "flatman",
+      "intrabred", "toplike",
+      "Prosobranchiata", "farrantly",
+      "toxoplasmosis", "gorilloid",
+      "dipsomaniacal", "aquiline",
+      "atlantite", "ascitic",
+      "perculsive", "prospectiveness",
+      "saponaceous", "centrifugalization",
+      "dinical", "infravaginal",
+      "beadroll", "affaite",
+      "Helvidian", "tickleproof",
+      "abstractionism", "enhedge",
+      "outwealth", "overcontribute",
+      "coldfinch", "gymnastic",
+      "Pincian", "Munychian",
+      "codisjunct", "quad",
+      "coracomandibular", "phoenicochroite",
+      "amender", "selectivity",
+      "putative", "semantician",
+      "lophotrichic", "Spatangoidea",
+      "saccharogenic", "inferent",
+      "Triconodonta", "arrendation",
+      "sheepskin", "taurocolla",
+      "bunghole", "Machiavel",
+      "triakistetrahedral", "dehairer",
+      "prezygapophysial", "cylindric",
+      "pneumonalgia", "sleigher",
+      "emir", "Socraticism",
+      "licitness", "massedly",
+      "instructiveness", "sturdied",
+      "redecrease", "starosta",
+      "evictor", "orgiastic",
+      "squdge", "meloplasty",
+      "Tsonecan", "repealableness",
+      "swoony", "myesthesia",
+      "molecule", "autobiographist",
+      "reciprocation", "refective",
+      "unobservantness", "tricae",
+      "ungouged", "floatability",
+      "Mesua", "fetlocked",
+      "chordacentrum", "sedentariness",
+      "various", "laubanite",
+      "nectopod", "zenick",
+      "sequentially", "analgic",
+      "biodynamics", "posttraumatic",
+      "nummi", "pyroacetic",
+      "bot", "redescend",
+      "dispermy", "undiffusive",
+      "circular", "trillion",
+      "Uraniidae", "ploration",
+      "discipular", "potentness",
+      "sud", "Hu",
+      "Eryon", "plugger",
+      "subdrainage", "jharal",
+      "abscission", "supermarket",
+      "countergabion", "glacierist",
+      "lithotresis", "minniebush",
+      "zanyism", "eucalypteol",
+      "sterilely", "unrealize",
+      "unpatched", "hypochondriacism",
+      "critically", "cheesecutter",
+     };
+  }
+
+  /**
+   * This is the main routine for launching a distributed random write job.
+   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+   * The reduce doesn't do anything.
+   * 
+   * @throws IOException 
+   */
+  public int run(String[] args) throws Exception {    
+    if (args.length == 0) {
+      return printUsage();    
+    }
+    Job job = createJob(getConf());
+    FileOutputFormat.setOutputPath(job, new Path(args[0]));
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+    Date endTime = new Date();
+    System.out.println("Job ended: " + endTime);
+    System.out.println("The job took " + 
+                       (endTime.getTime() - startTime.getTime()) /1000 + 
+                       " seconds.");
+    
+    return ret;
+  }
+
+  static int printUsage() {
+    System.out.println("randomtextwriter " +
+                       "[-outFormat <output format class>] " + 
+                       "<output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return 2;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new RandomTextWriterJob(),
+        args);
+    System.exit(res);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java Tue May 24 11:19:25 2011
@@ -0,0 +1,273 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period 
+ * of time in mapper and reducer. Generates fake input for map / reduce 
+ * jobs. Note that generated number of input pairs is in the order 
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool {
+  public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
+  public static String REDUCE_SLEEP_COUNT = 
+    "mapreduce.sleepjob.reduce.sleep.count";
+  public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
+  public static String REDUCE_SLEEP_TIME = 
+    "mapreduce.sleepjob.reduce.sleep.time";
+
+  public static class SleepJobPartitioner extends 
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+  
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat 
+      extends InputFormat<IntWritable,IntWritable> {
+    
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt(MRJobConfig.NUM_MAPS, 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+    
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+      final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper 
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration =
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records. 
+      try {
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+  
+  public static class SleepReducer  
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration =
+        conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+    }
+
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+      
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(int numMapper, int numReducer, 
+                       long mapSleepTime, int mapSleepCount, 
+                       long reduceSleepTime, int reduceSleepCount) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    Job job = Job.getInstance(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(SleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(SleepJobPartitioner.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-recordt recordSleepTime (msec)]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+    int mapSleepCount = 1, reduceSleepCount = 1;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+    }
+    
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    Job job = createJob(numMapper, numReducer, mapSleepTime,
+                mapSleepCount, reduceSleepTime, reduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue May 24 11:19:25 2011
@@ -19,8 +19,13 @@
 package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
 
 import junit.framework.Assert;
 
@@ -32,20 +37,29 @@ import org.apache.hadoop.RandomTextWrite
 import org.apache.hadoop.SleepJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -64,8 +78,22 @@ public class TestMRJobs {
 
   protected static MiniMRYarnCluster mrCluster;
 
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static Path TEST_ROOT_DIR = new Path("target",
+      TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
+  static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws IOException {
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -82,6 +110,11 @@ public class TestMRJobs {
     // TestMRJobs is for testing non-uberized operation only; see TestUberAM
     // for corresponding uberized tests.
     mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+    // workaround the absent public discache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
   }
 
   @AfterClass
@@ -112,9 +145,9 @@ public class TestMRJobs {
     // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
     Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
 
-    // TODO: We should not be setting MRAppJar as job.jar. It should be
-    // uploaded separately by YarnRunner.
-    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.setJarByClass(SleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
     job.waitForCompletion(true);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
 
@@ -141,9 +174,9 @@ public class TestMRJobs {
     Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
         "random-output");
     FileOutputFormat.setOutputPath(job, outputDir);
-    // TODO: We should not be setting MRAppJar as job.jar. It should be
-    // uploaded separately by YarnRunner.
-    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
     job.waitForCompletion(true);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     // Make sure there are three files in the output-dir
@@ -219,9 +252,7 @@ public class TestMRJobs {
     FileOutputFormat.setOutputPath(job,
         new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
         "failmapper-output"));
-    // TODO: We should not be setting MRAppJar as job.jar. It should be
-    // uploaded separately by YarnRunner.
-    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.waitForCompletion(true);
 
     return job;
@@ -264,9 +295,7 @@ public class TestMRJobs {
         Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
         // //Job with reduces
         // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
-        // TODO: We should not be setting MRAppJar as job.jar. It should be
-        // uploaded separately by YarnRunner.
-        job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
         job.waitForCompletion(true);
         Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
         return null;
@@ -276,4 +305,105 @@ public class TestMRJobs {
     // TODO later:  add explicit "isUber()" checks of some sort
   }
 
+  public static class DistributedCacheChecker extends
+      Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      Path[] files = context.getLocalCacheFiles();
+      Path[] archives = context.getLocalCacheArchives();
+      FileSystem fs = LocalFileSystem.get(conf);
+
+      // Check that 3(2+ appjar) files and 2 archives are present
+      Assert.assertEquals(3, files.length);
+      Assert.assertEquals(2, archives.length);
+
+      // Check lengths of the files
+      Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen());
+      Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
+
+      // Check extraction of the archive
+      Assert.assertTrue(fs.exists(new Path(archives[0],
+          "distributed.jar.inside3")));
+      Assert.assertTrue(fs.exists(new Path(archives[1],
+          "distributed.jar.inside4")));
+
+      // Check the class loaders
+      LOG.info("Java Classpath: " + System.getProperty("java.class.path"));
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      // Both the file and the archive should have been added to classpath, so
+      // both should be reachable via the class loader.
+      Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
+      Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
+      Assert.assertNull(cl.getResource("distributed.jar.inside4"));
+
+      // Check that the symlink for the renaming was created in the cwd;
+      File symlinkFile = new File("distributed.first.symlink");
+      Assert.assertTrue(symlinkFile.exists());
+      Assert.assertEquals(1, symlinkFile.length());
+    }
+  }
+
+  @Test
+  public void testDistributedCache() throws Exception {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+           + " not found. Not running test.");
+      return;
+    }
+
+    // Create a temporary file of length 1.
+    Path first = createTempFile("distributed.first", "x");
+    // Create two jars with a single file inside them.
+    Path second =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+    Path third =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+    Path fourth =
+        makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+    Job job = Job.getInstance(mrCluster.getConfig());
+    job.setJarByClass(DistributedCacheChecker.class);
+    job.setMapperClass(DistributedCacheChecker.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    FileInputFormat.setInputPaths(job, first);
+    // Creates the Job Configuration
+    job.addCacheFile(
+        new URI(first.toUri().toString() + "#distributed.first.symlink"));
+    job.addFileToClassPath(second);
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.addArchiveToClassPath(third);
+    job.addCacheArchive(fourth.toUri());
+    job.createSymlink();
+    job.setMaxMapAttempts(1); // speed up failures
+
+    job.submit();
+    Assert.assertTrue(job.waitForCompletion(false));
+  }
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(TEST_ROOT_DIR, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    localFs.setPermission(path, new FsPermission("700"));
+    return path;
+  }
+
+  private Path makeJar(Path p, int index) throws FileNotFoundException,
+      IOException {
+    FileOutputStream fos =
+        new FileOutputStream(new File(p.toUri().getPath()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+    jos.putNextEntry(ze);
+    jos.write(("inside the jar!" + index).getBytes());
+    jos.closeEntry();
+    jos.close();
+    localFs.setPermission(p, new FsPermission("700"));
+    return p;
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Tue May 24 11:19:25 2011
@@ -26,8 +26,12 @@ import junit.framework.Assert;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.FailingMapper;
 import org.apache.hadoop.SleepJob;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -44,6 +48,20 @@ public class TestMRJobsWithHistoryServic
 
   private static MiniMRYarnCluster mrCluster;
 
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static Path TEST_ROOT_DIR = new Path("target",
+      TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
+  static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
   @Before
   public void setup() throws InterruptedException, IOException {
 
@@ -58,6 +76,11 @@ public class TestMRJobsWithHistoryServic
       mrCluster.init(new Configuration());
       mrCluster.start();
     }
+
+    // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+    // workaround the absent public discache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
   }
 
   @After
@@ -86,7 +109,8 @@ public class TestMRJobsWithHistoryServic
     sleepJob.setConf(mrCluster.getConfig());
     // Job with 3 maps and 2 reduces
     Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
-    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.setJarByClass(SleepJob.class);
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.waitForCompletion(true);
     Counters counterMR = job.getCounters();
     ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue May 24 11:19:25 2011
@@ -33,13 +33,14 @@ import org.apache.hadoop.mapreduce.TaskC
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class TestUberAM extends TestMRJobs {
 
   private static final Log LOG = LogFactory.getLog(TestUberAM.class);
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws IOException {
     TestMRJobs.setup();
     if (mrCluster != null) {
     	mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
@@ -109,4 +110,10 @@ public class TestUberAM extends TestMRJo
     super.testSleepJobWithSecurityOn();
   }
 
+  // Add a test for distcache when uber mode is enabled. TODO
+  @Override
+  @Test
+  public void testDistributedCache() throws Exception {
+    //
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java Tue May 24 11:19:25 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.Clock;
+
 /**
  * A clock class - can be mocked out for testing.
  */
@@ -33,7 +35,7 @@ class SimulatorClock extends Clock {
   }
 
   @Override
-  long getTime() {
+  public long getTime() {
     return currentTime;
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Tue May 24 11:19:25 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobStatu
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.Clock;
 import org.apache.hadoop.mapred.SimulatorJobInProgress;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.security.Credentials;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue May 24 11:19:25 2011
@@ -235,9 +235,7 @@ public class ContainerManagerImpl extend
   public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(" container is " + request);
-    }
+    LOG.info(" container is " + request);
 
     // parse credentials
     ByteBuffer tokens = launchContext.getContainerTokens();