You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC
svn commit: r1619012 [7/7] - in
/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/
conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapreduce.lib.jobcontrol;
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Tue Aug 19 23:49:39 2014
@@ -26,7 +26,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -198,6 +198,11 @@ public class TestJHSSecurity {
fail("Unexpected exception" + e);
}
cancelDelegationToken(loggedInUser, hsService, token);
+
+ // Testing the token with different renewer to cancel the token
+ Token tokenWithDifferentRenewer = getDelegationToken(loggedInUser,
+ hsService, "yarn");
+ cancelDelegationToken(loggedInUser, hsService, tokenWithDifferentRenewer);
if (clientUsingDT != null) {
// RPC.stopProxy(clientUsingDT);
clientUsingDT = null;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAMWithNonNormalizedCapabilities.java Tue Aug 19 23:49:39 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.File;
import java.io.IOException;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -118,4 +118,4 @@ public class TestMRAMWithNonNormalizedCa
mrCluster.stop();
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue Aug 19 23:49:39 2014
@@ -33,8 +33,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
@Test(timeout = 300000)
public void testJobClassloader() throws IOException, InterruptedException,
ClassNotFoundException {
- LOG.info("\n\n\nStarting testJobClassloader().");
+ testJobClassloader(false);
+ }
+
+ @Test(timeout = 300000)
+ public void testJobClassloaderWithCustomClasses() throws IOException,
+ InterruptedException, ClassNotFoundException {
+ testJobClassloader(true);
+ }
+
+ private void testJobClassloader(boolean useCustomClasses) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ LOG.info("\n\n\nStarting testJobClassloader()"
+ + " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+ if (useCustomClasses) {
+ // to test AM loading user classes such as output format class, we want
+ // to blacklist them from the system classes (they need to be prepended
+ // as the first match wins)
+ String systemClasses =
+ sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+ // exclude the custom classes from system classes
+ systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+ CustomSpeculator.class.getName() + "," +
+ systemClasses;
+ sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+ systemClasses);
+ }
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
+ if (useCustomClasses) {
+ // set custom output format class and speculator class
+ job.setOutputFormatClass(CustomOutputFormat.class);
+ final Configuration jobConf = job.getConfiguration();
+ jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+ Speculator.class);
+ // speculation needs to be enabled for the speculator to be loaded
+ jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+ }
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
+ public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+ public CustomOutputFormat() {
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
+ public static class CustomSpeculator extends DefaultSpeculator {
+ public CustomSpeculator(Configuration conf, AppContext context) {
+ super(conf, context);
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.avro.AvroRemoteException;
import org.apache.commons.logging.Log;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,8 @@ import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,8 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestMRJobsWithProfiler {
@@ -51,6 +51,8 @@ public class TestMRJobsWithProfiler {
private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
+ private static final int PROFILED_TASK_ID = 1;
+
private static MiniMRYarnCluster mrCluster;
private static final Configuration CONF = new Configuration();
@@ -69,8 +71,8 @@ public class TestMRJobsWithProfiler {
private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
- @Before
- public void setup() throws InterruptedException, IOException {
+ @BeforeClass
+ public static void setup() throws InterruptedException, IOException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -79,7 +81,7 @@ public class TestMRJobsWithProfiler {
}
if (mrCluster == null) {
- mrCluster = new MiniMRYarnCluster(getClass().getName());
+ mrCluster = new MiniMRYarnCluster(TestMRJobsWithProfiler.class.getName());
mrCluster.init(CONF);
mrCluster.start();
}
@@ -90,8 +92,8 @@ public class TestMRJobsWithProfiler {
localFs.setPermission(APP_JAR, new FsPermission("700"));
}
- @After
- public void tearDown() {
+ @AfterClass
+ public static void tearDown() {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@@ -103,10 +105,19 @@ public class TestMRJobsWithProfiler {
}
}
+ @Test (timeout = 150000)
+ public void testDefaultProfiler() throws Exception {
+ LOG.info("Starting testDefaultProfiler");
+ testProfilerInternal(true);
+ }
@Test (timeout = 150000)
- public void testProfiler() throws IOException, InterruptedException,
- ClassNotFoundException {
+ public void testDifferentProfilers() throws Exception {
+ LOG.info("Starting testDefaultProfiler");
+ testProfilerInternal(false);
+ }
+
+ private void testProfilerInternal(boolean useDefault) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@@ -117,18 +128,19 @@ public class TestMRJobsWithProfiler {
final JobConf sleepConf = new JobConf(mrCluster.getConfig());
sleepConf.setProfileEnabled(true);
- // profile map split 1
- sleepConf.setProfileTaskRange(true, "1");
- // profile reduce of map output partitions 1
- sleepConf.setProfileTaskRange(false, "1");
-
- // use hprof for map to profile.out
- sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
- "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
- + "file=%s");
+ sleepConf.setProfileTaskRange(true, String.valueOf(PROFILED_TASK_ID));
+ sleepConf.setProfileTaskRange(false, String.valueOf(PROFILED_TASK_ID));
+
+ if (!useDefault) {
+ // use hprof for map to profile.out
+ sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS,
+ "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n,"
+ + "file=%s");
+
+ // use Xprof for reduce to stdout
+ sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
+ }
- // use Xprof for reduce to stdout
- sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof");
sleepJob.setConf(sleepConf);
// 2-map-2-reduce SleepJob
@@ -205,8 +217,8 @@ public class TestMRJobsWithProfiler {
TaskLog.LogName.PROFILE.toString());
final Path stdoutPath = new Path(dirEntry.getValue(),
TaskLog.LogName.STDOUT.toString());
- if (tid.getTaskType() == TaskType.MAP) {
- if (tid.getTaskID().getId() == 1) {
+ if (useDefault || tid.getTaskType() == TaskType.MAP) {
+ if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
// verify profile.out
final BufferedReader br = new BufferedReader(new InputStreamReader(
localFs.open(profilePath)));
@@ -222,7 +234,8 @@ public class TestMRJobsWithProfiler {
} else {
Assert.assertFalse("hprof file should not exist",
localFs.exists(profilePath));
- if (tid.getTaskID().getId() == 1) {
+ if (tid.getTaskID().getId() == PROFILED_TASK_ID) {
+ // reducer is profiled with Xprof
final BufferedReader br = new BufferedReader(new InputStreamReader(
localFs.open(stdoutPath)));
boolean flatProfFound = false;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Random;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -40,22 +40,26 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
+import com.google.common.base.Supplier;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestSpeculativeExecutionWithMRApp {
private static final int NUM_MAPPERS = 5;
private static final int NUM_REDUCERS = 0;
- @Test(timeout = 60000)
+ @Test
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
Clock actualClock = new SystemClock();
- ControlledClock clock = new ControlledClock(actualClock);
+ final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis());
MRApp app =
@@ -88,7 +92,7 @@ public class TestSpeculativeExecutionWit
Random generator = new Random();
Object[] taskValues = tasks.values().toArray();
- Task taskToBeSpeculated =
+ final Task taskToBeSpeculated =
(Task) taskValues[generator.nextInt(taskValues.length)];
// Other than one random task, finish every other task.
@@ -105,30 +109,28 @@ public class TestSpeculativeExecutionWit
}
}
- int maxTimeWait = 10;
- boolean successfullySpeculated = false;
- TaskAttempt[] ta = null;
- while (maxTimeWait > 0 && !successfullySpeculated) {
- if (taskToBeSpeculated.getAttempts().size() != 2) {
- Thread.sleep(1000);
- clock.setTime(System.currentTimeMillis() + 20000);
- } else {
- successfullySpeculated = true;
- // finish 1st TA, 2nd will be killed
- ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ if (taskToBeSpeculated.getAttempts().size() != 2) {
+ clock.setTime(System.currentTimeMillis() + 1000);
+ return false;
+ } else {
+ return true;
+ }
}
- maxTimeWait--;
- }
- Assert
- .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ }, 1000, 60000);
+ // finish 1st TA, 2nd will be killed
+ TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
verifySpeculationMessage(app, ta);
+ app.waitForState(Service.STATE.STOPPED);
}
- @Test(timeout = 60000)
+ @Test
public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
Clock actualClock = new SystemClock();
- ControlledClock clock = new ControlledClock(actualClock);
+ final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis());
MRApp app =
@@ -200,21 +202,21 @@ public class TestSpeculativeExecutionWit
}
}
- int maxTimeWait = 5;
- boolean successfullySpeculated = false;
- TaskAttempt[] ta = null;
- while (maxTimeWait > 0 && !successfullySpeculated) {
- if (speculatedTask.getAttempts().size() != 2) {
- Thread.sleep(1000);
- } else {
- successfullySpeculated = true;
- ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
+ final Task speculatedTaskConst = speculatedTask;
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ if (speculatedTaskConst.getAttempts().size() != 2) {
+ clock.setTime(System.currentTimeMillis() + 1000);
+ return false;
+ } else {
+ return true;
+ }
}
- maxTimeWait--;
- }
- Assert
- .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ }, 1000, 60000);
+ TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
verifySpeculationMessage(app, ta);
+ app.waitForState(Service.STATE.STOPPED);
}
private static TaskAttempt[] makeFirstAttemptWin(
@@ -234,15 +236,7 @@ public class TestSpeculativeExecutionWit
private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
throws Exception {
app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
- app.waitForState(ta[1], TaskAttemptState.KILLED);
- boolean foundSpecMsg = false;
- for (String msg : ta[1].getDiagnostics()) {
- if (msg.contains("Speculation")) {
- foundSpecMsg = true;
- break;
- }
- }
- Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
+ // The speculative attempt may be not killed before the MR job succeeds.
}
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml Tue Aug 19 23:49:39 2014
@@ -35,12 +35,52 @@
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>ShuffleHandlerRecovery.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByt
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -72,16 +76,27 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -113,8 +128,10 @@ import org.jboss.netty.handler.stream.Ch
import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
public class ShuffleHandler extends AuxiliaryService {
@@ -132,6 +149,11 @@ public class ShuffleHandler extends Auxi
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);
+ private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
+ private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
+ protected static final Version CURRENT_VERSION_INFO =
+ Version.newInstance(1, 0);
+
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -149,14 +171,14 @@ public class ShuffleHandler extends Auxi
private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+ private Map<String,String> userRsrc;
+ private JobTokenSecretManager secretManager;
+
+ private DB stateDb = null;
+
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce_shuffle";
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<String,String>();
- private static final JobTokenSecretManager secretManager =
- new JobTokenSecretManager();
-
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 13562;
@@ -292,9 +314,7 @@ public class ShuffleHandler extends Auxi
Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
// TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
- userRsrc.put(jobId.toString(), user);
- LOG.info("Added token for " + jobId.toString());
- secretManager.addTokenForJob(jobId.toString(), jt);
+ recordJobShuffleInfo(jobId, user, jt);
} catch (IOException e) {
LOG.error("Error during initApp", e);
// TODO add API to AuxiliaryServices to report failures
@@ -305,8 +325,12 @@ public class ShuffleHandler extends Auxi
public void stopApplication(ApplicationTerminationContext context) {
ApplicationId appId = context.getApplicationId();
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
- secretManager.removeTokenForJob(jobId.toString());
- userRsrc.remove(jobId.toString());
+ try {
+ removeJobShuffleInfo(jobId);
+ } catch (IOException e) {
+ LOG.error("Error during stopApp", e);
+ // TODO add API to AuxiliaryServices to report failures
+ }
}
@Override
@@ -350,6 +374,9 @@ public class ShuffleHandler extends Auxi
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
+ userRsrc = new ConcurrentHashMap<String,String>();
+ secretManager = new JobTokenSecretManager();
+ recoverState(conf);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
try {
pipelineFact = new HttpPipelineFactory(conf);
@@ -389,6 +416,9 @@ public class ShuffleHandler extends Auxi
if (pipelineFact != null) {
pipelineFact.destroy();
}
+ if (stateDb != null) {
+ stateDb.close();
+ }
super.serviceStop();
}
@@ -407,6 +437,191 @@ public class ShuffleHandler extends Auxi
return new Shuffle(conf);
}
+ private void recoverState(Configuration conf) throws IOException {
+ Path recoveryRoot = getRecoveryPath();
+ if (recoveryRoot != null) {
+ startStore(recoveryRoot);
+ Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(stateDb);
+ iter.seek(bytes(JobID.JOB));
+ while (iter.hasNext()) {
+ Map.Entry<byte[],byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!jobPattern.matcher(key).matches()) {
+ break;
+ }
+ recoverJobShuffleInfo(key, entry.getValue());
+ }
+ } catch (DBException e) {
+ throw new IOException("Database error during recovery", e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ }
+ }
+
+ private void startStore(Path recoveryRoot) throws IOException {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
+ LOG.info("Using state database at " + dbPath + " for recovery");
+ File dbfile = new File(dbPath.toString());
+ try {
+ stateDb = JniDBFactory.factory.open(dbfile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating state database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ stateDb = JniDBFactory.factory.open(dbfile, options);
+ storeVersion();
+ } catch (DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ throw e;
+ }
+ }
+ checkVersion();
+ }
+
+ @VisibleForTesting
+ Version loadVersion() throws IOException {
+ byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return Version.newInstance(1, 0);
+ }
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
+ return version;
+ }
+
+ private void storeSchemaVersion(Version version) throws IOException {
+ String key = STATE_DB_SCHEMA_VERSION_KEY;
+ byte[] data =
+ ((VersionPBImpl) version).getProto().toByteArray();
+ try {
+ stateDb.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ private void storeVersion() throws IOException {
+ storeSchemaVersion(CURRENT_VERSION_INFO);
+ }
+
+ // Only used for test
+ @VisibleForTesting
+ void storeVersion(Version version) throws IOException {
+ storeSchemaVersion(version);
+ }
+
+ protected Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ /**
+ * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of DB schema is a major upgrade, and any
+ * compatible change of DB schema is a minor upgrade.
+ * 3) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 4) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade shuffle info or remove incompatible old state.
+ */
+ private void checkVersion() throws IOException {
+ Version loadedVersion = loadVersion();
+ LOG.info("Loaded state DB schema version info " + loadedVersion);
+ if (loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing state DB schedma version info " + getCurrentVersion());
+ storeVersion();
+ } else {
+ throw new IOException(
+ "Incompatible version for state DB schema: expecting DB schema version "
+ + getCurrentVersion() + ", but loading version " + loadedVersion);
+ }
+ }
+
+ private void addJobToken(JobID jobId, String user,
+ Token<JobTokenIdentifier> jobToken) {
+ userRsrc.put(jobId.toString(), user);
+ secretManager.addTokenForJob(jobId.toString(), jobToken);
+ LOG.info("Added token for " + jobId.toString());
+ }
+
+ private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
+ throws IOException {
+ JobID jobId;
+ try {
+ jobId = JobID.forName(jobIdStr);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
+ }
+
+ JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
+ String user = proto.getUser();
+ TokenProto tokenProto = proto.getJobToken();
+ Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+ tokenProto.getIdentifier().toByteArray(),
+ tokenProto.getPassword().toByteArray(),
+ new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
+ addJobToken(jobId, user, jobToken);
+ }
+
+ private void recordJobShuffleInfo(JobID jobId, String user,
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ if (stateDb != null) {
+ TokenProto tokenProto = TokenProto.newBuilder()
+ .setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
+ .setPassword(ByteString.copyFrom(jobToken.getPassword()))
+ .setKind(jobToken.getKind().toString())
+ .setService(jobToken.getService().toString())
+ .build();
+ JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
+ .setUser(user).setJobToken(tokenProto).build();
+ try {
+ stateDb.put(bytes(jobId.toString()), proto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException("Error storing " + jobId, e);
+ }
+ }
+ addJobToken(jobId, user, jobToken);
+ }
+
+ private void removeJobShuffleInfo(JobID jobId) throws IOException {
+ String jobIdStr = jobId.toString();
+ secretManager.removeTokenForJob(jobIdStr);
+ userRsrc.remove(jobIdStr);
+ if (stateDb != null) {
+ try {
+ stateDb.delete(bytes(jobIdStr));
+ } catch (DBException e) {
+ throw new IOException("Unable to remove " + jobId
+ + " from state store", e);
+ }
+ }
+ }
+
+ private static class LevelDBLogger implements Logger {
+ private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Tue Aug 19 23:49:39 2014
@@ -51,11 +51,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -63,12 +67,15 @@ import org.apache.hadoop.metrics2.Metric
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -645,4 +652,181 @@ public class TestShuffleHandler {
output.writeLong(chk.getChecksum().getValue());
output.close();
}
+
+ @Test
+ public void testRecovery() throws IOException {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
+ final File tmpDir = new File(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestShuffleHandler.class.getName());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ ShuffleHandler shuffle = new ShuffleHandler();
+ // emulate aux services startup with recovery enabled
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ tmpDir.mkdirs();
+ try {
+ shuffle.init(conf);
+ shuffle.start();
+
+ // setup a shuffle token for an application
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+ "identifier".getBytes(), "password".getBytes(), new Text(user),
+ new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffle.initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+
+ // verify we are authorized to shuffle
+ int rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // shutdown app and verify access is lost
+ shuffle.stopApplication(new ApplicationTerminationContext(appId));
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we still don't have access
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
+ } finally {
+ if (shuffle != null) {
+ shuffle.close();
+ }
+ FileUtil.fullyDelete(tmpDir);
+ }
+ }
+
+ @Test
+ public void testRecoveryFromOtherVersions() throws IOException {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ final File tmpDir = new File(System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestShuffleHandler.class.getName());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ ShuffleHandler shuffle = new ShuffleHandler();
+ // emulate aux services startup with recovery enabled
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ tmpDir.mkdirs();
+ try {
+ shuffle.init(conf);
+ shuffle.start();
+
+ // setup a shuffle token for an application
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
+ "identifier".getBytes(), "password".getBytes(), new Text(user),
+ new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffle.initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+
+ // verify we are authorized to shuffle
+ int rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+ Version version = Version.newInstance(1, 0);
+ Assert.assertEquals(version, shuffle.getCurrentVersion());
+
+ // emulate shuffle handler restart with compatible version
+ Version version11 = Version.newInstance(1, 1);
+ // update version info before close shuffle
+ shuffle.storeVersion(version11);
+ Assert.assertEquals(version11, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+ shuffle.start();
+ // shuffle version will be override by CURRENT_VERSION_INFO after restart
+ // successfully.
+ Assert.assertEquals(version, shuffle.loadVersion());
+ // verify we are still authorized to shuffle to the old application
+ rc = getShuffleResponseCode(shuffle, jt);
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+ // emulate shuffle handler restart with incompatible version
+ Version version21 = Version.newInstance(2, 1);
+ shuffle.storeVersion(version21);
+ Assert.assertEquals(version21, shuffle.loadVersion());
+ shuffle.close();
+ shuffle = new ShuffleHandler();
+ shuffle.setRecoveryPath(new Path(tmpDir.toString()));
+ shuffle.init(conf);
+
+ try {
+ shuffle.start();
+ Assert.fail("Incompatible version, should expect fail here.");
+ } catch (ServiceStateException e) {
+ Assert.assertTrue("Exception message mismatch",
+ e.getMessage().contains("Incompatible version for state DB schema:"));
+ }
+
+ } finally {
+ if (shuffle != null) {
+ shuffle.close();
+ }
+ FileUtil.fullyDelete(tmpDir);
+ }
+ }
+
+ private static int getShuffleResponseCode(ShuffleHandler shuffle,
+ Token<JobTokenIdentifier> jt) throws IOException {
+ URL url = new URL("http://127.0.0.1:"
+ + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ String encHash = SecureShuffleUtils.hashFromString(
+ SecureShuffleUtils.buildMsgFrom(url),
+ JobTokenSecretManager.createSecretKey(jt.getPassword()));
+ conn.addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ int rc = conn.getResponseCode();
+ conn.disconnect();
+ return rc;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Tue Aug 19 23:49:39 2014
@@ -72,24 +72,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java Tue Aug 19 23:49:39 2014
@@ -68,8 +68,8 @@ public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: wordcount <in> <out>");
+ if (otherArgs.length < 2) {
+ System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
@@ -79,8 +79,11 @@ public class WordCount {
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+ for (int i = 0; i < otherArgs.length - 1; ++i) {
+ FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
+ }
+ FileOutputFormat.setOutputPath(job,
+ new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/pom.xml Tue Aug 19 23:49:39 2014
@@ -81,24 +81,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
@@ -162,6 +144,10 @@
<artifactId>hsqldb</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ </dependency>
</dependencies>