You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/17 21:41:54 UTC
svn commit: r1469042 - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Author: szetszwo
Date: Wed Apr 17 19:41:50 2013
New Revision: 1469042
URL: http://svn.apache.org/r1469042
Log:
Merge r1467713 through r1469041 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/job_1329348432655_0001-10.jhist
- copied unchanged from r1469041, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/job_1329348432655_0001-10.jhist
Modified:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1467713-1469041
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Wed Apr 17 19:41:50 2013
@@ -200,6 +200,9 @@ Release 2.0.5-beta - UNRELEASED
OPTIMIZATIONS
+ MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
+ (Gelesh via bobby)
+
BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are
@@ -299,6 +302,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
YARN-486. (Xuan Gong via vinodkv)
+ MAPREDUCE-5151. Update MR AM to use standard exit codes from the API after
+ YARN-444. (Sandy Ryza via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@@ -851,10 +857,16 @@ Release 0.23.8 - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes
+ are different on source/target (Mithun Radhakrishnan via kihwal)
+
OPTIMIZATIONS
BUG FIXES
+ MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
+ (Aleksey Gorshkov via tgraves)
+
Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1467713-1469041
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1467713-1469041
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Apr 17 19:41:50 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -67,7 +68,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -624,7 +624,7 @@ public class RMContainerAllocator extend
@VisibleForTesting
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
TaskAttemptId attemptID) {
- if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
+ if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
// killed by framework
return new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_KILL);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Apr 17 19:41:50 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.ClusterInf
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -83,7 +84,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
@@ -1660,7 +1660,7 @@ public class TestRMContainerAllocator {
ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
containerId, ContainerState.RUNNING, "",
- YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+ ContainerExitStatus.ABORTED);
TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
attemptId);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed Apr 17 19:41:50 2013
@@ -52,7 +52,6 @@ public class LineRecordReader extends Re
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";
- private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
@@ -60,9 +59,9 @@ public class LineRecordReader extends Re
private FSDataInputStream fileIn;
private Seekable filePosition;
private int maxLineLength;
- private LongWritable key = null;
- private Text value = null;
- private CompressionCodec codec;
+ private LongWritable key;
+ private Text value;
+ private boolean isCompressedInput;
private Decompressor decompressor;
private byte[] recordDelimiterBytes;
@@ -81,13 +80,14 @@ public class LineRecordReader extends Re
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
- compressionCodecs = new CompressionCodecFactory(job);
- codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
- if (isCompressedInput()) {
+
+ CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
+ if (null!=codec) {
+ isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
@@ -132,19 +132,16 @@ public class LineRecordReader extends Re
this.pos = start;
}
- private boolean isCompressedInput() {
- return (codec != null);
- }
private int maxBytesToConsume(long pos) {
- return isCompressedInput()
+ return isCompressedInput
? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
- if (isCompressedInput() && null != filePosition) {
+ if (isCompressedInput && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
@@ -166,9 +163,6 @@ public class LineRecordReader extends Re
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
- if (newSize == 0) {
- break;
- }
pos += newSize;
if (newSize < maxLineLength) {
break;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java Wed Apr 17 19:41:50 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.logaggregation.LogDumper;
@@ -64,8 +65,6 @@ import com.google.common.base.Charsets;
public class CLI extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CLI.class);
protected Cluster cluster;
- private final Set<String> taskTypes = new HashSet<String>(
- Arrays.asList("map", "reduce", "setup", "cleanup"));
private final Set<String> taskStates = new HashSet<String>(
Arrays.asList("pending", "running", "completed", "failed", "killed"));
@@ -317,6 +316,7 @@ public class CLI extends Configured impl
exitCode = 0;
} else if (displayTasks) {
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
+ exitCode = 0;
} else if(killTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = cluster.getJob(taskID.getJobID());
@@ -563,16 +563,18 @@ public class CLI extends Configured impl
*/
protected void displayTasks(Job job, String type, String state)
throws IOException, InterruptedException {
- if (!taskTypes.contains(type)) {
- throw new IllegalArgumentException("Invalid type: " + type +
- ". Valid types for task are: map, reduce, setup, cleanup.");
- }
+
if (!taskStates.contains(state)) {
throw new java.lang.IllegalArgumentException("Invalid state: " + state +
". Valid states for task are: pending, running, completed, failed, killed.");
}
-
- TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
+ TaskReport[] reports=null;
+ try{
+ reports = job.getTaskReports(TaskType.valueOf(type));
+ }catch(IllegalArgumentException e){
+ throw new IllegalArgumentException("Invalid type: " + type +
+ ". Valid types for task are: MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP.");
+ }
for (TaskReport report : reports) {
TIPStatus status = report.getCurrentStatus();
if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
@@ -626,6 +628,6 @@ public class CLI extends Configured impl
public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new CLI(), argv);
- System.exit(res);
+ ExitUtil.terminate(res);
}
}
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1467713-1469041
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Wed Apr 17 19:41:50 2013
@@ -154,6 +154,7 @@
<configuration>
<excludes>
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
+ <exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
</excludes>
</configuration>
</plugin>
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java Wed Apr 17 19:41:50 2013
@@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -30,35 +32,37 @@ import java.io.PrintStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.junit.Ignore;
-import org.junit.Test;
-@Ignore
+/**
+ test CLI class. CLI class implemented the Tool interface.
+ Here test that CLI sends correct command with options and parameters.
+ */
public class TestMRJobClient extends ClusterMapReduceTestCase {
-
+
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
-
+
private Job runJob(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n";
- Job job = MapReduceTestUtil.createJob(conf,
- getInputDir(), getOutputDir(), 1, 1, input);
+ Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+ 1, 1, input);
job.setJobName("mr");
- job.setPriority(JobPriority.HIGH);
+ job.setPriority(JobPriority.NORMAL);
job.waitForCompletion(true);
return job;
}
-
- public static int runTool(Configuration conf, Tool tool,
- String[] args, OutputStream out) throws Exception {
+
+ public static int runTool(Configuration conf, Tool tool, String[] args,
+ OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
@@ -69,20 +73,17 @@ public class TestMRJobClient extends Clu
}
}
- private static class BadOutputFormat
- extends TextOutputFormat {
+ private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
@Override
- public void checkOutputSpecs(JobContext job)
- throws FileAlreadyExistsException, IOException {
+ public void checkOutputSpecs(JobContext job) throws IOException {
throw new IOException();
}
}
-
- @Test
+
public void testJobSubmissionSpecsAndFiles() throws Exception {
Configuration conf = createJobConf();
- Job job = MapReduceTestUtil.createJob(conf,
- getInputDir(), getOutputDir(), 1, 1);
+ Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+ 1, 1);
job.setOutputFormatClass(BadOutputFormat.class);
try {
job.submit();
@@ -90,60 +91,392 @@ public class TestMRJobClient extends Clu
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
- JobID jobId = job.getJobID();
Cluster cluster = new Cluster(conf);
- Path jobStagingArea = JobSubmissionFiles.getStagingDir(
- cluster,
+ Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration());
- Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+ Path submitJobDir = new Path(jobStagingArea, "JobId");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
- assertFalse(
- "Shouldn't have created a job file if job specs failed.",
- FileSystem.get(conf).exists(submitJobFile)
- );
+ assertFalse("Shouldn't have created a job file if job specs failed.",
+ FileSystem.get(conf).exists(submitJobFile));
}
- @Test
+ /**
+ * main test method
+ */
+
public void testJobClient() throws Exception {
Configuration conf = createJobConf();
Job job = runJob(conf);
+
String jobId = job.getJobID().toString();
- testGetCounter(jobId, conf);
+ // test jobs list
testJobList(jobId, conf);
+ // test job counter
+ testGetCounter(jobId, conf);
+ // status
+ testJobStatus(jobId, conf);
+ // test list of events
+ testJobEvents(jobId, conf);
+ // test job history
+ testJobHistory(conf);
+ // test tracker list
+ testListTrackers(conf);
+ // attempts list
+ testListAttemptIds(jobId, conf);
+ // black list
+ testListBlackList(conf);
+ // test method main and help screen
+ startStop();
+ // test a change job priority .
testChangingJobPriority(jobId, conf);
+ // submit job from file
+ testSubmit(conf);
+ // kill a task
+ testKillTask(job, conf);
+ // fail a task
+ testfailTask(job, conf);
+ // kill job
+ testKillJob(jobId, conf);
+
+ }
+
+ /**
+ * test fail task
+ */
+ private void testfailTask(Job job, Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
+ TaskAttemptID taid = new TaskAttemptID(tid, 1);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // TaskAttemptId is not set
+ int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ try {
+ runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
+ fail(" this task should field");
+ } catch (YarnRemoteException e) {
+ // task completed !
+ assertTrue(e.getMessage().contains("_0001_m_000000_1"));
+ }
+ }
+ /**
+ * test a kill task
+ */
+ private void testKillTask(Job job, Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
+ TaskAttemptID taid = new TaskAttemptID(tid, 1);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // bad parameters
+ int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ try {
+ runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
+ fail(" this task should be killed");
+ } catch (YarnRemoteException e) {
+ // task completed
+ assertTrue(e.getMessage().contains("_0001_m_000000_1"));
+ }
}
+
+ /**
+ * test a kill job
+ */
+ private void testKillJob(String jobId, Configuration conf) throws Exception {
+ CLI jc = createJobClient();
- @Test
- public void testGetCounter(String jobId,
- Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // without jobId
+ int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
+ assertEquals("Exit code", -1, exitCode);
+ // good parameters
+ exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
+ assertEquals("Exit code", 0, exitCode);
+
+ String answer = new String(out.toByteArray(), "UTF-8");
+ assertTrue(answer.contains("Killed job " + jobId));
+ }
+
+ /**
+ * test submit task from file
+ */
+ private void testSubmit(Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+
+ Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+ 1, 1, "ping");
+ job.setJobName("mr");
+ job.setPriority(JobPriority.NORMAL);
+
+ File fcon = File.createTempFile("config", ".xml");
+
+ job.getConfiguration().writeXml(new FileOutputStream(fcon));
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // bad parameters
+ int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+
+ exitCode = runTool(conf, jc,
+ new String[] { "-submit", "file://" + fcon.getAbsolutePath() }, out);
+ assertEquals("Exit code", 0, exitCode);
+ String answer = new String(out.toByteArray());
+ // in console was written
+ assertTrue(answer.contains("Created job "));
+ }
+ /**
+ * test start form console command without options
+ */
+ private void startStop() {
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ PrintStream error = System.err;
+ System.setErr(new PrintStream(data));
+ ExitUtil.disableSystemExit();
+ try {
+ CLI.main(new String[0]);
+ fail(" CLI.main should call System.exit");
+
+ } catch (ExitUtil.ExitException e) {
+ ExitUtil.resetFirstExitException();
+ assertEquals(-1, e.status);
+ } catch (Exception e) {
+
+ } finally {
+ System.setErr(error);
+ }
+ // in console should be written help text
+ String s = new String(data.toByteArray());
+ assertTrue(s.contains("-submit"));
+ assertTrue(s.contains("-status"));
+ assertTrue(s.contains("-kill"));
+ assertTrue(s.contains("-set-priority"));
+ assertTrue(s.contains("-events"));
+ assertTrue(s.contains("-history"));
+ assertTrue(s.contains("-list"));
+ assertTrue(s.contains("-list-active-trackers"));
+ assertTrue(s.contains("-list-blacklisted-trackers"));
+ assertTrue(s.contains("-list-attempt-ids"));
+ assertTrue(s.contains("-kill-task"));
+ assertTrue(s.contains("-fail-task"));
+ assertTrue(s.contains("-logs"));
+
+ }
+ /**
+ * black list
+ */
+ private void testListBlackList(Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int exitCode = runTool(conf, jc, new String[] {
+ "-list-blacklisted-trackers", "second in" }, out);
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
+ out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ counter++;
+ }
+ assertEquals(0, counter);
+ }
+ /**
+ * print AttemptIds list
+ */
+ private void testListAttemptIds(String jobId, Configuration conf)
+ throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
+ "MAP", "completed" }, out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ counter++;
+ }
+ assertEquals(1, counter);
+ }
+ /**
+ * print tracker list
+ */
+ private void testListTrackers(Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
+ "second parameter" }, out);
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ counter++;
+ }
+ assertEquals(2, counter);
+ }
+ /**
+ * print job history from file
+ */
+ private void testJobHistory(Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
+ // bad command
+ int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
+ "file://" + f.getAbsolutePath() }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ exitCode = runTool(conf, jc, new String[] { "-history", "all",
+ "file://" + f.getAbsolutePath() }, out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ if (line.startsWith("task_")) {
+ counter++;
+ }
+ }
+ assertEquals(23, counter);
+ }
+ /**
+ * print job events list
+ */
+ private void testJobEvents(String jobId, Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
+ out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ int counter = 0;
+ String attemptId = ("attempt" + jobId.substring(3));
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ if (line.contains(attemptId)) {
+ counter++;
+ }
+ }
+ assertEquals(2, counter);
+ }
+ /**
+ * print job status
+ */
+ private void testJobStatus(String jobId, Configuration conf) throws Exception {
+ CLI jc = createJobClient();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // bad options
+ int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
+ assertEquals("Exit code", 0, exitCode);
+ String line;
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ if (!line.contains("Job state:")) {
+ continue;
+ }
+ break;
+ }
+ assertNotNull(line);
+ assertTrue(line.contains("SUCCEEDED"));
+ }
+ /**
+ * print counters
+ */
+ public void testGetCounter(String jobId, Configuration conf) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // bad command
int exitCode = runTool(conf, createJobClient(),
+ new String[] { "-counter", }, out);
+ assertEquals("Exit code", -1, exitCode);
+
+ exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", jobId,
- "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
+ "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
out);
assertEquals("Exit code", 0, exitCode);
assertEquals("Counter", "3", out.toString().trim());
}
+ /**
+ * print a job list
+ */
+ protected void testJobList(String jobId, Configuration conf) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // bad options
- @Test
- public void testJobList(String jobId,
- Configuration conf) throws Exception {
- verifyJobPriority(jobId, "HIGH", conf, createJobClient());
- }
+ int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
+ "alldata" }, out);
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, createJobClient(),
+ // all jobs
+ new String[] { "-list", "all" }, out);
+ assertEquals("Exit code", 0, exitCode);
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(out.toByteArray())));
+ String line;
+ int counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ if (line.contains(jobId)) {
+ counter++;
+ }
+ }
+ assertEquals(1, counter);
+ out.reset();
+ // only submitted
+ exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
+ assertEquals("Exit code", 0, exitCode);
+ br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
+ out.toByteArray())));
+ counter = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("line = " + line);
+ if (line.contains(jobId)) {
+ counter++;
+ }
+ }
+ // all jobs submitted! no current
+ assertEquals(1, counter);
+ }
+
protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
- int exitCode = runTool(conf, jc,
- new String[] { "-list", "all" },
- pos);
+ int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
- String line = null;
+ String line;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
- if (!line.startsWith(jobId)) {
+ if (!line.contains(jobId)) {
continue;
}
assertTrue(line.contains(priority));
@@ -152,63 +485,16 @@ public class TestMRJobClient extends Clu
pis.close();
}
- @Test
public void testChangingJobPriority(String jobId, Configuration conf)
throws Exception {
int exitCode = runTool(conf, createJobClient(),
- new String[] { "-set-priority", jobId, "VERY_LOW" },
- new ByteArrayOutputStream());
+ new String[] { "-set-priority" }, new ByteArrayOutputStream());
+ assertEquals("Exit code", -1, exitCode);
+ exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
+ jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode);
- verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient());
- }
-
- @Test
- public void testMissingProfileOutput() throws Exception {
- Configuration conf = createJobConf();
- final String input = "hello1\n";
-
- // Set a job to be profiled with an empty agentlib parameter.
- // This will fail to create profile.out files for tasks.
- // This will succeed by skipping the HTTP fetch of the
- // profiler output.
- Job job = MapReduceTestUtil.createJob(conf,
- getInputDir(), getOutputDir(), 1, 1, input);
- job.setJobName("disable-profile-fetch");
- job.setProfileEnabled(true);
- job.setProfileParams("-agentlib:,verbose=n,file=%s");
- job.setMaxMapAttempts(1);
- job.setMaxReduceAttempts(1);
- job.setJobSetupCleanupNeeded(false);
- job.waitForCompletion(true);
-
- // Run another job with an hprof agentlib param; verify
- // that the HTTP fetch works here.
- Job job2 = MapReduceTestUtil.createJob(conf,
- getInputDir(), getOutputDir(), 1, 1, input);
- job2.setJobName("enable-profile-fetch");
- job2.setProfileEnabled(true);
- job2.setProfileParams(
- "-agentlib:hprof=cpu=samples,heap=sites,force=n,"
- + "thread=y,verbose=n,file=%s");
- job2.setProfileTaskRange(true, "0-1");
- job2.setProfileTaskRange(false, "");
- job2.setMaxMapAttempts(1);
- job2.setMaxReduceAttempts(1);
- job2.setJobSetupCleanupNeeded(false);
- job2.waitForCompletion(true);
-
- // Find the first map task, verify that we got its profile output file.
- TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
- assertTrue("No task reports found!", reports.length > 0);
- TaskReport report = reports[0];
- TaskID id = report.getTaskId();
- assertTrue(TaskType.MAP == id.getTaskType());
- System.out.println("Using task id: " + id);
- TaskAttemptID attemptId = new TaskAttemptID(id, 0);
-
- File profileOutFile = new File(attemptId.toString() + ".profile");
- assertTrue("Couldn't find profiler output", profileOutFile.exists());
- assertTrue("Couldn't remove profiler output", profileOutFile.delete());
+ // because this method does not implemented still.
+ verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
}
protected CLI createJobClient() throws IOException {