You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cn...@apache.org on 2014/01/23 18:49:27 UTC
svn commit: r1560768 - in
/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java...
Author: cnauroth
Date: Thu Jan 23 17:49:24 2014
New Revision: 1560768
URL: http://svn.apache.org/r1560768
Log:
Merge trunk to HDFS-4685.
Modified:
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1559794-1560767
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt Thu Jan 23 17:49:24 2014
@@ -203,6 +203,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5672. Provide optional RollingFileAppender for container log4j
(syslog) (Gera Shegalov via jlowe)
+ MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
+ Scheduler (Sandy Ryza)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@@ -280,6 +283,14 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running.
(tucu)
+ MAPREDUCE-5729. mapred job -list throws NPE (kasha)
+
+ MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via
+ jlowe)
+
+ MAPREDUCE-5723. MR AM container log can be truncated or empty.
+ (Mohammad Kamrul Islam via kasha)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1559794-1560767
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1559794-1560767
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Thu Jan 23 17:49:24 2014
@@ -27,6 +27,7 @@ import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.Applic
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.log4j.LogManager;
/**
* The main() for MapReduce task processes.
@@ -123,6 +123,7 @@ class YarnChild {
LOG.debug("PID: " + System.getenv().get("JVM_PID"));
Task task = null;
UserGroupInformation childUGI = null;
+ ScheduledExecutorService logSyncer = null;
try {
int idleLoopCount = 0;
@@ -161,6 +162,8 @@ class YarnChild {
// set job classloader if configured before invoking the task
MRApps.setJobClassLoader(job);
+ logSyncer = TaskLog.createLogSyncer();
+
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@@ -214,10 +217,7 @@ class YarnChild {
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
- // Shutting down log4j of the child-vm...
- // This assumes that on return from Task.run()
- // there is no more logging done.
- LogManager.shutdown();
+ TaskLog.syncLogsShutdown(logSyncer);
}
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Jan 23 17:49:24 2014
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
@@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutp
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -119,6 +121,7 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
@@ -212,6 +215,7 @@ public class MRAppMaster extends Composi
boolean errorHappenedShutDown = false;
private String shutDownMessage = null;
JobStateInternal forcedState = null;
+ private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
@@ -240,6 +244,7 @@ public class MRAppMaster extends Composi
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
this.maxAppAttempts = maxAppAttempts;
+ logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@@ -1078,6 +1083,12 @@ public class MRAppMaster extends Composi
// All components have started, start the job.
startJobs();
}
+
+ @Override
+ public void stop() {
+ super.stop();
+ TaskLog.syncLogsShutdown(logSyncer);
+ }
private void processRecovery() {
if (appAttemptID.getAttemptId() == 1) {
@@ -1395,9 +1406,7 @@ public class MRAppMaster extends Composi
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
- System.exit(1);
- } finally {
- LogManager.shutdown();
+ ExitUtil.terminate(1, t);
}
}
@@ -1473,4 +1482,11 @@ public class MRAppMaster extends Composi
}
});
}
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ LogManager.shutdown();
+ }
+
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Jan 23 17:49:24 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -445,11 +446,18 @@ public class TypeConverter {
jobStatus.setStartTime(application.getStartTime());
jobStatus.setFinishTime(application.getFinishTime());
jobStatus.setFailureInfo(application.getDiagnostics());
- jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
- jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
- jobStatus.setNumUsedSlots(application.getApplicationResourceUsageReport().getNumUsedContainers());
- jobStatus.setReservedMem(application.getApplicationResourceUsageReport().getReservedResources().getMemory());
- jobStatus.setUsedMem(application.getApplicationResourceUsageReport().getUsedResources().getMemory());
+ ApplicationResourceUsageReport resourceUsageReport =
+ application.getApplicationResourceUsageReport();
+ if (resourceUsageReport != null) {
+ jobStatus.setNeededMem(
+ resourceUsageReport.getNeededResources().getMemory());
+ jobStatus.setNumReservedSlots(
+ resourceUsageReport.getNumReservedContainers());
+ jobStatus.setNumUsedSlots(resourceUsageReport.getNumUsedContainers());
+ jobStatus.setReservedMem(
+ resourceUsageReport.getReservedResources().getMemory());
+ jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemory());
+ }
return jobStatus;
}
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Jan 23 17:49:24 2014
@@ -23,8 +23,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -40,6 +38,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -112,6 +111,14 @@ public class TestTypeConverter {
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
String jobFile = "dummy-path/job.xml";
+
+ try {
+ JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
+ } catch (NullPointerException npe) {
+ Assert.fail("Type converstion from YARN fails for jobs without " +
+ "ApplicationUsageReport");
+ }
+
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu Jan 23 17:49:24 2014
@@ -23,12 +23,17 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtil
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@@ -262,7 +269,86 @@ public class TaskLog {
}
writeToIndexFile(logLocation, isCleanup);
}
-
+
+ public static synchronized void syncLogsShutdown(
+ ScheduledExecutorService scheduler)
+ {
+ // flush standard streams
+ //
+ System.out.flush();
+ System.err.flush();
+
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+
+ // flush & close all appenders
+ LogManager.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static synchronized void syncLogs() {
+ // flush standard streams
+ //
+ System.out.flush();
+ System.err.flush();
+
+ // flush flushable appenders
+ //
+ final Logger rootLogger = Logger.getRootLogger();
+ flushAppenders(rootLogger);
+ final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository().
+ getCurrentLoggers();
+ while (allLoggers.hasMoreElements()) {
+ final Logger l = allLoggers.nextElement();
+ flushAppenders(l);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void flushAppenders(Logger l) {
+ final Enumeration<Appender> allAppenders = l.getAllAppenders();
+ while (allAppenders.hasMoreElements()) {
+ final Appender a = allAppenders.nextElement();
+ if (a instanceof Flushable) {
+ try {
+ ((Flushable) a).flush();
+ } catch (IOException ioe) {
+ System.err.println(a + ": Failed to flush!"
+ + StringUtils.stringifyException(ioe));
+ }
+ }
+ }
+ }
+
+ public static ScheduledExecutorService createLogSyncer() {
+ final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ t.setName("Thread for syncLogs");
+ return t;
+ }
+ });
+ ShutdownHookManager.get().addShutdownHook(new Runnable() {
+ @Override
+ public void run() {
+ TaskLog.syncLogsShutdown(scheduler);
+ }
+ }, 50);
+ scheduler.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ TaskLog.syncLogs();
+ }
+ }, 0L, 5L, TimeUnit.SECONDS);
+ return scheduler;
+ }
+
/**
* The filter for userlogs.
*/
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java Thu Jan 23 17:49:24 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent
*
*/
@InterfaceStability.Unstable
-public class TaskLogAppender extends FileAppender {
+public class TaskLogAppender extends FileAppender implements Flushable {
private String taskId; //taskId should be managed as String rather than TaskID object
//so that log4j can configure it from the configuration(log4j.properties).
private Integer maxEvents;
@@ -92,6 +93,7 @@ public class TaskLogAppender extends Fil
}
}
+ @Override
public void flush() {
if (qw != null) {
qw.flush();
Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1559794-1560767
Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java?rev=1560768&r1=1560767&r2=1560768&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java Thu Jan 23 17:49:24 2014
@@ -45,7 +45,9 @@ import org.apache.hadoop.mapred.lib.Iden
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Test;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -76,8 +78,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
try {
- mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
- new Configuration());
+ mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@@ -129,8 +130,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
try {
- Configuration conf = new Configuration();
- mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
+ mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@@ -315,8 +315,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
PrintStream oldOut = System.out;
try {
- Configuration conf = new Configuration();
- mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
+ mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@@ -392,4 +391,13 @@ public class TestNetworkedJob {
}
}
}
+
+ private MiniMRClientCluster createMiniClusterWithCapacityScheduler()
+ throws IOException {
+ Configuration conf = new Configuration();
+ // Expected queue names depending on Capacity Scheduler queue naming
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ CapacityScheduler.class);
+ return MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
+ }
}