You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC

svn commit: r1619012 [7/7] - in /hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapreduce.lib.jobcontrol;
 
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Tue Aug 19 23:49:39 2014
@@ -26,7 +26,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -198,6 +198,11 @@ public class TestJHSSecurity {
         fail("Unexpected exception" + e);
       }
       cancelDelegationToken(loggedInUser, hsService, token);
+
+      // Testing the token with different renewer to cancel the token
+      Token tokenWithDifferentRenewer = getDelegationToken(loggedInUser,
+          hsService, "yarn");
+      cancelDelegationToken(loggedInUser, hsService, tokenWithDifferentRenewer);
       if (clientUsingDT != null) {
 //        RPC.stopProxy(clientUsingDT);
         clientUsingDT = null;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java Tue Aug 19 23:49:39 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2;
 import java.io.File;
 import java.io.IOException;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -118,4 +118,4 @@ public class TestMRAMWithNonNormalizedCa
       mrCluster.stop();
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue Aug 19 23:49:39 2014
@@ -33,8 +33,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
   @Test(timeout = 300000)
   public void testJobClassloader() throws IOException, InterruptedException,
       ClassNotFoundException {
-    LOG.info("\n\n\nStarting testJobClassloader().");
+    testJobClassloader(false);
+  }
+
+  @Test(timeout = 300000)
+  public void testJobClassloaderWithCustomClasses() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    testJobClassloader(true);
+  }
+
+  private void testJobClassloader(boolean useCustomClasses) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting testJobClassloader()"
+        + " useCustomClasses=" + useCustomClasses);
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
     // set master address to local to test that local mode applied iff framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
     sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+    if (useCustomClasses) {
+      // to test AM loading user classes such as output format class, we want
+      // to blacklist them from the system classes (they need to be prepended
+      // as the first match wins)
+      String systemClasses =
+          sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+      // exclude the custom classes from system classes
+      systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+          CustomSpeculator.class.getName() + "," +
+          systemClasses;
+      sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+          systemClasses);
+    }
     sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
     sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
     sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(SleepJob.class);
     job.setMaxMapAttempts(1); // speed up failures
+    if (useCustomClasses) {
+      // set custom output format class and speculator class
+      job.setOutputFormatClass(CustomOutputFormat.class);
+      final Configuration jobConf = job.getConfiguration();
+      jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+          Speculator.class);
+      // speculation needs to be enabled for the speculator to be loaded
+      jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+    }
     job.submit();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
         succeeded);
   }
 
+  public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+    public CustomOutputFormat() {
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
+  public static class CustomSpeculator extends DefaultSpeculator {
+    public CustomSpeculator(Configuration conf, AppContext context) {
+      super(conf, context);
+      verifyClassLoader(getClass());
+    }
+
+    /**
+     * Verifies that the class was loaded by the job classloader if it is in the
+     * context of the MRAppMaster, and if not throws an exception to fail the
+     * job.
+     */
+    private void verifyClassLoader(Class<?> cls) {
+      // to detect that it is instantiated in the context of the MRAppMaster, we
+      // inspect the stack trace and determine a caller is MRAppMaster
+      for (StackTraceElement e: new Throwable().getStackTrace()) {
+        if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+            !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+          throw new ExceptionInInitializerError("incorrect classloader used");
+        }
+      }
+    }
+  }
+
   protected void verifySleepJobCounters(Job job) throws InterruptedException,
       IOException {
     Counters counters = job.getCounters();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.commons.logging.Log;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,8 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,8 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRJobsWithProfiler {
@@ -51,6 +51,8 @@ public class TestMRJobsWithProfiler {
   private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
     EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
 
+  private static final int PROFILED_TASK_ID = 1;
+
   private static MiniMRYarnCluster mrCluster;
 
   private static final Configuration CONF = new Configuration();
@@ -69,8 +71,8 @@ public class TestMRJobsWithProfiler {
 
   private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
 
-  @Before
-  public void setup() throws InterruptedException, IOException {
+  @BeforeClass
+  public static void setup() throws InterruptedException, IOException {
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -79,7 +81,7 @@ public class TestMRJobsWithProfiler {
     }
 
     if (mrCluster == null) {
-      mrCluster = new MiniMRYarnCluster(getClass().getName());
+      mrCluster = new MiniMRYarnCluster(TestMRJobsWithProfiler.class.getName());
       mrCluster.init(CONF);
       mrCluster.start();
     }
@@ -90,8 +92,8 @@ public class TestMRJobsWithProfiler {
     localFs.setPermission(APP_JAR, new FsPermission("700"));
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
           + " not found. Not running test.");
@@ -103,10 +105,19 @@ public class TestMRJobsWithProfiler {
     }
   }
 
+  @Test (timeout = 150000)
+  public void testDefaultProfiler() throws Exception {
+    LOG.info("Starting testDefaultProfiler");
+    testProfilerInternal(true);
+  }
 
   @Test (timeout = 150000)
-  public void testProfiler() throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public void testDifferentProfilers() throws Exception {
+    LOG.info("Starting testDefaultProfiler");
+    testProfilerInternal(false);
+  }
+
+  private void testProfilerInternal(boolean useDefault) throws Exception {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
         + " not found. Not running test.");
@@ -117,18 +128,19 @@ public class TestMRJobsWithProfiler {
     final JobConf sleepConf = new JobConf(mrCluster.getConfig());
 
     sleepConf.setProfileEnabled(true);
-    // profile map split 1
-    sleepConf.setProfileTaskRange(true, "1");
-    // profile reduce of map output partitions 1
-    sleepConf.setProfileTaskRange(false, "1");
-
-    // use hprof for map to profile.out
-    sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
-        "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
-      + "file=%s");
+    sleepConf.setProfileTaskRange(true, String.valueOf(PROFILED_TASK_ID));
+    sleepConf.setProfileTaskRange(false, String.valueOf(PROFILED_TASK_ID));
+
+    if (!useDefault) {
+      // use hprof for map to profile.out
+      sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+          "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+              + "file=%s");
+
+      // use Xprof for reduce to stdout
+      sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+    }
 
-    // use Xprof for reduce to stdout
-    sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
     sleepJob.setConf(sleepConf);
 
     // 2-map-2-reduce SleepJob
@@ -205,8 +217,8 @@ public class TestMRJobsWithProfiler {
         TaskLog.LogName.PROFILE.toString());
       final Path stdoutPath = new Path(dirEntry.getValue(),
         TaskLog.LogName.STDOUT.toString());
-      if (tid.getTaskType() == TaskType.MAP) {
-        if (tid.getTaskID().getId() == 1) {
+      if (useDefault || tid.getTaskType() == TaskType.MAP) {
+        if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
           // verify profile.out
           final BufferedReader br = new BufferedReader(new InputStreamReader(
             localFs.open(profilePath)));
@@ -222,7 +234,8 @@ public class TestMRJobsWithProfiler {
       } else {
         Assert.assertFalse("hprof file should not exist",
           localFs.exists(profilePath));
-        if (tid.getTaskID().getId() == 1) {
+        if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
+          // reducer is profiled with Xprof
           final BufferedReader br = new BufferedReader(new InputStreamReader(
             localFs.open(stdoutPath)));
           boolean flatProfFound = false;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -40,22 +40,26 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
+
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestSpeculativeExecutionWithMRApp {
 
   private static final int NUM_MAPPERS = 5;
   private static final int NUM_REDUCERS = 0;
 
-  @Test(timeout = 60000)
+  @Test
   public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
 
     Clock actualClock = new SystemClock();
-    ControlledClock clock = new ControlledClock(actualClock);
+    final ControlledClock clock = new ControlledClock(actualClock);
     clock.setTime(System.currentTimeMillis());
 
     MRApp app =
@@ -88,7 +92,7 @@ public class TestSpeculativeExecutionWit
 
     Random generator = new Random();
     Object[] taskValues = tasks.values().toArray();
-    Task taskToBeSpeculated =
+    final Task taskToBeSpeculated =
         (Task) taskValues[generator.nextInt(taskValues.length)];
 
     // Other than one random task, finish every other task.
@@ -105,30 +109,28 @@ public class TestSpeculativeExecutionWit
       }
     }
 
-    int maxTimeWait = 10;
-    boolean successfullySpeculated = false;
-    TaskAttempt[] ta = null;
-    while (maxTimeWait > 0 && !successfullySpeculated) {
-      if (taskToBeSpeculated.getAttempts().size() != 2) {
-        Thread.sleep(1000);
-        clock.setTime(System.currentTimeMillis() + 20000);
-      } else {
-        successfullySpeculated = true;
-        // finish 1st TA, 2nd will be killed
-        ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if (taskToBeSpeculated.getAttempts().size() != 2) {
+          clock.setTime(System.currentTimeMillis() + 1000);
+          return false;
+        } else {
+          return true;
+        }
       }
-      maxTimeWait--;
-    }
-    Assert
-      .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    }, 1000, 60000);
+    // finish 1st TA, 2nd will be killed
+    TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
     verifySpeculationMessage(app, ta);
+    app.waitForState(Service.STATE.STOPPED);
   }
 
-  @Test(timeout = 60000)
+  @Test
   public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
 
     Clock actualClock = new SystemClock();
-    ControlledClock clock = new ControlledClock(actualClock);
+    final ControlledClock clock = new ControlledClock(actualClock);
     clock.setTime(System.currentTimeMillis());
 
     MRApp app =
@@ -200,21 +202,21 @@ public class TestSpeculativeExecutionWit
       }
     }
 
-    int maxTimeWait = 5;
-    boolean successfullySpeculated = false;
-    TaskAttempt[] ta = null;
-    while (maxTimeWait > 0 && !successfullySpeculated) {
-      if (speculatedTask.getAttempts().size() != 2) {
-        Thread.sleep(1000);
-      } else {
-        successfullySpeculated = true;
-        ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
+    final Task speculatedTaskConst = speculatedTask;
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if (speculatedTaskConst.getAttempts().size() != 2) {
+          clock.setTime(System.currentTimeMillis() + 1000);
+          return false;
+        } else {
+          return true;
+        }
       }
-      maxTimeWait--;
-    }
-    Assert
-      .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    }, 1000, 60000);
+    TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
     verifySpeculationMessage(app, ta);
+    app.waitForState(Service.STATE.STOPPED);
   }
 
   private static TaskAttempt[] makeFirstAttemptWin(
@@ -234,15 +236,7 @@ public class TestSpeculativeExecutionWit
   private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
       throws Exception {
     app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
-    app.waitForState(ta[1], TaskAttemptState.KILLED);
-    boolean foundSpecMsg = false;
-    for (String msg : ta[1].getDiagnostics()) {
-      if (msg.contains("Speculation")) {
-        foundSpecMsg = true;
-        break;
-      }
-    }
-    Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
+    // The speculative attempt may be not killed before the MR job succeeds.
   }
 
   private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml Tue Aug 19 23:49:39 2014
@@ -35,12 +35,52 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-nodemanager</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>ShuffleHandlerRecovery.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
 import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByt
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -72,16 +76,27 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -113,8 +128,10 @@ import org.jboss.netty.handler.stream.Ch
 import org.jboss.netty.util.CharsetUtil;
 import org.mortbay.jetty.HttpHeaders;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
 
 public class ShuffleHandler extends AuxiliaryService {
 
@@ -132,6 +149,11 @@ public class ShuffleHandler extends Auxi
       "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
       Pattern.CASE_INSENSITIVE);
 
+  private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
+  private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
+  protected static final Version CURRENT_VERSION_INFO = 
+      Version.newInstance(1, 0);
+
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -149,14 +171,14 @@ public class ShuffleHandler extends Auxi
   private boolean shuffleTransferToAllowed;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
+  private Map<String,String> userRsrc;
+  private JobTokenSecretManager secretManager;
+
+  private DB stateDb = null;
+
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce_shuffle";
 
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static final JobTokenSecretManager secretManager =
-    new JobTokenSecretManager();
-
   public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 13562;
 
@@ -292,9 +314,7 @@ public class ShuffleHandler extends Auxi
       Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
        // TODO: Once SHuffle is out of NM, this can use MR APIs
       JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
-      userRsrc.put(jobId.toString(), user);
-      LOG.info("Added token for " + jobId.toString());
-      secretManager.addTokenForJob(jobId.toString(), jt);
+      recordJobShuffleInfo(jobId, user, jt);
     } catch (IOException e) {
       LOG.error("Error during initApp", e);
       // TODO add API to AuxiliaryServices to report failures
@@ -305,8 +325,12 @@ public class ShuffleHandler extends Auxi
   public void stopApplication(ApplicationTerminationContext context) {
     ApplicationId appId = context.getApplicationId();
     JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
-    secretManager.removeTokenForJob(jobId.toString());
-    userRsrc.remove(jobId.toString());
+    try {
+      removeJobShuffleInfo(jobId);
+    } catch (IOException e) {
+      LOG.error("Error during stopApp", e);
+      // TODO add API to AuxiliaryServices to report failures
+    }
   }
 
   @Override
@@ -350,6 +374,9 @@ public class ShuffleHandler extends Auxi
   @Override
   protected void serviceStart() throws Exception {
     Configuration conf = getConfig();
+    userRsrc = new ConcurrentHashMap<String,String>();
+    secretManager = new JobTokenSecretManager();
+    recoverState(conf);
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
     try {
       pipelineFact = new HttpPipelineFactory(conf);
@@ -389,6 +416,9 @@ public class ShuffleHandler extends Auxi
     if (pipelineFact != null) {
       pipelineFact.destroy();
     }
+    if (stateDb != null) {
+      stateDb.close();
+    }
     super.serviceStop();
   }
 
@@ -407,6 +437,191 @@ public class ShuffleHandler extends Auxi
     return new Shuffle(conf);
   }
 
+  private void recoverState(Configuration conf) throws IOException {
+    Path recoveryRoot = getRecoveryPath();
+    if (recoveryRoot != null) {
+      startStore(recoveryRoot);
+      Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
+      LeveldbIterator iter = null;
+      try {
+        iter = new LeveldbIterator(stateDb);
+        iter.seek(bytes(JobID.JOB));
+        while (iter.hasNext()) {
+          Map.Entry<byte[],byte[]> entry = iter.next();
+          String key = asString(entry.getKey());
+          if (!jobPattern.matcher(key).matches()) {
+            break;
+          }
+          recoverJobShuffleInfo(key, entry.getValue());
+        }
+      } catch (DBException e) {
+        throw new IOException("Database error during recovery", e);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+      }
+    }
+  }
+
+  private void startStore(Path recoveryRoot) throws IOException {
+    Options options = new Options();
+    options.createIfMissing(false);
+    options.logger(new LevelDBLogger());
+    Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
+    LOG.info("Using state database at " + dbPath + " for recovery");
+    File dbfile = new File(dbPath.toString());
+    try {
+      stateDb = JniDBFactory.factory.open(dbfile, options);
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating state database at " + dbfile);
+        options.createIfMissing(true);
+        try {
+          stateDb = JniDBFactory.factory.open(dbfile, options);
+          storeVersion();
+        } catch (DBException dbExc) {
+          throw new IOException("Unable to create state store", dbExc);
+        }
+      } else {
+        throw e;
+      }
+    }
+    checkVersion();
+  }
+  
+  @VisibleForTesting
+  Version loadVersion() throws IOException {
+    byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version =
+        new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeSchemaVersion(Version version) throws IOException {
+    String key = STATE_DB_SCHEMA_VERSION_KEY;
+    byte[] data = 
+        ((VersionPBImpl) version).getProto().toByteArray();
+    try {
+      stateDb.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+  
+  private void storeVersion() throws IOException {
+    storeSchemaVersion(CURRENT_VERSION_INFO);
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(Version version) throws IOException {
+    storeSchemaVersion(version);
+  }
+
+  protected Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of DB schema is a major upgrade, and any
+   *    compatible change of DB schema is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade shuffle info or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded state DB schema version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing state DB schedma version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new IOException(
+        "Incompatible version for state DB schema: expecting DB schema version " 
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
+    }
+  }
+
+  private void addJobToken(JobID jobId, String user,
+      Token<JobTokenIdentifier> jobToken) {
+    userRsrc.put(jobId.toString(), user);
+    secretManager.addTokenForJob(jobId.toString(), jobToken);
+    LOG.info("Added token for " + jobId.toString());
+  }
+
+  private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
+      throws IOException {
+    JobID jobId;
+    try {
+      jobId = JobID.forName(jobIdStr);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
+    }
+
+    JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
+    String user = proto.getUser();
+    TokenProto tokenProto = proto.getJobToken();
+    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+        tokenProto.getIdentifier().toByteArray(),
+        tokenProto.getPassword().toByteArray(),
+        new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
+    addJobToken(jobId, user, jobToken);
+  }
+
+  private void recordJobShuffleInfo(JobID jobId, String user,
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    if (stateDb != null) {
+      TokenProto tokenProto = TokenProto.newBuilder()
+          .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
+          .setPassword(ByteString.copyFrom(jobToken.getPassword()))
+          .setKind(jobToken.getKind().toString())
+          .setService(jobToken.getService().toString())
+          .build();
+      JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
+          .setUser(user).setJobToken(tokenProto).build();
+      try {
+        stateDb.put(bytes(jobId.toString()), proto.toByteArray());
+      } catch (DBException e) {
+        throw new IOException("Error storing " + jobId, e);
+      }
+    }
+    addJobToken(jobId, user, jobToken);
+  }
+
+  private void removeJobShuffleInfo(JobID jobId) throws IOException {
+    String jobIdStr = jobId.toString();
+    secretManager.removeTokenForJob(jobIdStr);
+    userRsrc.remove(jobIdStr);
+    if (stateDb != null) {
+      try {
+        stateDb.delete(bytes(jobIdStr));
+      } catch (DBException e) {
+        throw new IOException("Unable to remove " + jobId
+            + " from state store", e);
+      }
+    }
+  }
+
+  private static class LevelDBLogger implements Logger {
+    private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
+
+    @Override
+    public void log(String message) {
+      LOG.info(message);
+    }
+  }
+
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Tue Aug 19 23:49:39 2014
@@ -51,11 +51,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -63,12 +67,15 @@ import org.apache.hadoop.metrics2.Metric
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -645,4 +652,181 @@ public class TestShuffleHandler {
     output.writeLong(chk.getChecksum().getValue());
     output.close();
   }
+
+  @Test
+  public void testRecovery() throws IOException {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
+    final File tmpDir = new File(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")),
+        TestShuffleHandler.class.getName());
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    ShuffleHandler shuffle = new ShuffleHandler();
+    // emulate aux services startup with recovery enabled
+    shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+    tmpDir.mkdirs();
+    try {
+      shuffle.init(conf);
+      shuffle.start();
+
+      // setup a shuffle token for an application
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+          "identifier".getBytes(), "password".getBytes(), new Text(user),
+          new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffle.initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+            outputBuffer.getLength())));
+
+      // verify we are authorized to shuffle
+      int rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // shutdown app and verify access is lost
+      shuffle.stopApplication(new ApplicationTerminationContext(appId));
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we still don't have access
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+    } finally {
+      if (shuffle != null) {
+        shuffle.close();
+      }
+      FileUtil.fullyDelete(tmpDir);
+    }
+  }
+  
+  @Test
+  public void testRecoveryFromOtherVersions() throws IOException {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    final File tmpDir = new File(System.getProperty("test.build.data",
+        System.getProperty("java.io.tmpdir")),
+        TestShuffleHandler.class.getName());
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    ShuffleHandler shuffle = new ShuffleHandler();
+    // emulate aux services startup with recovery enabled
+    shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+    tmpDir.mkdirs();
+    try {
+      shuffle.init(conf);
+      shuffle.start();
+
+      // setup a shuffle token for an application
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+          "identifier".getBytes(), "password".getBytes(), new Text(user),
+          new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffle.initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
+
+      // verify we are authorized to shuffle
+      int rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+      // emulate shuffle handler restart
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+      Version version = Version.newInstance(1, 0);
+      Assert.assertEquals(version, shuffle.getCurrentVersion());
+    
+      // emulate shuffle handler restart with compatible version
+      Version version11 = Version.newInstance(1, 1);
+      // update version info before close shuffle
+      shuffle.storeVersion(version11);
+      Assert.assertEquals(version11, shuffle.loadVersion());
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+      shuffle.start();
+      // shuffle version will be override by CURRENT_VERSION_INFO after restart
+      // successfully.
+      Assert.assertEquals(version, shuffle.loadVersion());
+      // verify we are still authorized to shuffle to the old application
+      rc = getShuffleResponseCode(shuffle, jt);
+      Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+    
+      // emulate shuffle handler restart with incompatible version
+      Version version21 = Version.newInstance(2, 1);
+      shuffle.storeVersion(version21);
+      Assert.assertEquals(version21, shuffle.loadVersion());
+      shuffle.close();
+      shuffle = new ShuffleHandler();
+      shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+      shuffle.init(conf);
+    
+      try {
+        shuffle.start();
+        Assert.fail("Incompatible version, should expect fail here.");
+      } catch (ServiceStateException e) {
+        Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for state DB schema:"));
+      } 
+    
+    } finally {
+      if (shuffle != null) {
+        shuffle.close();
+      }
+      FileUtil.fullyDelete(tmpDir);
+    }
+  }
+
+  private static int getShuffleResponseCode(ShuffleHandler shuffle,
+      Token<JobTokenIdentifier> jt) throws IOException {
+    URL url = new URL("http://127.0.0.1:"
+        + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+        + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    String encHash = SecureShuffleUtils.hashFromString(
+        SecureShuffleUtils.buildMsgFrom(url),
+        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+    conn.addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    conn.connect();
+    int rc = conn.getResponseCode();
+    conn.disconnect();
+    return rc;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Tue Aug 19 23:49:39 2014
@@ -72,24 +72,6 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java Tue Aug 19 23:49:39 2014
@@ -68,8 +68,8 @@ public class WordCount {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: wordcount <in> <out>");
+    if (otherArgs.length < 2) {
+      System.err.println("Usage: wordcount <in> [<in>...] <out>");
       System.exit(2);
     }
     Job job = new Job(conf, "word count");
@@ -79,8 +79,11 @@ public class WordCount {
     job.setReducerClass(IntSumReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+    for (int i = 0; i < otherArgs.length - 1; ++i) {
+      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
+    }
+    FileOutputFormat.setOutputPath(job,
+      new Path(otherArgs[otherArgs.length - 1]));
     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml Tue Aug 19 23:49:39 2014
@@ -81,24 +81,6 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-el</groupId>
-          <artifactId>commons-el</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>tomcat</groupId>
-          <artifactId>jasper-compiler</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jsp-2.1-jetty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
@@ -162,6 +144,10 @@
       <artifactId>hsqldb</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
 
   </dependencies>