You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/17 21:41:54 UTC

svn commit: r1469042 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Author: szetszwo
Date: Wed Apr 17 19:41:50 2013
New Revision: 1469042

URL: http://svn.apache.org/r1469042
Log:
Merge r1467713 through r1469041 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/job_1329348432655_0001-10.jhist
      - copied unchanged from r1469041, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/job_1329348432655_0001-10.jhist
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1467713-1469041

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Wed Apr 17 19:41:50 2013
@@ -200,6 +200,9 @@ Release 2.0.5-beta - UNRELEASED
 
   OPTIMIZATIONS
 
+    MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 
+    (Gelesh via bobby)
+
   BUG FIXES
 
     MAPREDUCE-4671. AM does not tell the RM about container requests which are
@@ -299,6 +302,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
     YARN-486. (Xuan Gong via vinodkv)
 
+    MAPREDUCE-5151. Update MR AM to use standard exit codes from the API after
+    YARN-444. (Sandy Ryza via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -851,10 +857,16 @@ Release 0.23.8 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes 
+    are different on source/target (Mithun Radhakrishnan via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES
 
+    MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
+    (Aleksey Gorshkov via tgraves)
+
 Release 0.23.7 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1467713-1469041

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1467713-1469041

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Apr 17 19:41:50 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -67,7 +68,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
 
@@ -624,7 +624,7 @@ public class RMContainerAllocator extend
   @VisibleForTesting
   public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
       TaskAttemptId attemptID) {
-    if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
+    if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
       // killed by framework
       return new TaskAttemptEvent(attemptID,
           TaskAttemptEventType.TA_KILL);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Apr 17 19:41:50 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.ClusterInf
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -83,7 +84,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -1660,7 +1660,7 @@ public class TestRMContainerAllocator {
 
     ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
         containerId, ContainerState.RUNNING, "",
-        YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+        ContainerExitStatus.ABORTED);
     
     TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
         attemptId);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed Apr 17 19:41:50 2013
@@ -52,7 +52,6 @@ public class LineRecordReader extends Re
   public static final String MAX_LINE_LENGTH = 
     "mapreduce.input.linerecordreader.line.maxlength";
 
-  private CompressionCodecFactory compressionCodecs = null;
   private long start;
   private long pos;
   private long end;
@@ -60,9 +59,9 @@ public class LineRecordReader extends Re
   private FSDataInputStream fileIn;
   private Seekable filePosition;
   private int maxLineLength;
-  private LongWritable key = null;
-  private Text value = null;
-  private CompressionCodec codec;
+  private LongWritable key;
+  private Text value;
+  private boolean isCompressedInput;
   private Decompressor decompressor;
   private byte[] recordDelimiterBytes;
 
@@ -81,13 +80,14 @@ public class LineRecordReader extends Re
     start = split.getStart();
     end = start + split.getLength();
     final Path file = split.getPath();
-    compressionCodecs = new CompressionCodecFactory(job);
-    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     final FileSystem fs = file.getFileSystem(job);
     fileIn = fs.open(file);
-    if (isCompressedInput()) {
+    
+    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
+    if (null!=codec) {
+      isCompressedInput = true;	
       decompressor = CodecPool.getDecompressor(codec);
       if (codec instanceof SplittableCompressionCodec) {
         final SplitCompressionInputStream cIn =
@@ -132,19 +132,16 @@ public class LineRecordReader extends Re
     this.pos = start;
   }
   
-  private boolean isCompressedInput() {
-    return (codec != null);
-  }
 
   private int maxBytesToConsume(long pos) {
-    return isCompressedInput()
+    return isCompressedInput
       ? Integer.MAX_VALUE
       : (int) Math.min(Integer.MAX_VALUE, end - pos);
   }
 
   private long getFilePosition() throws IOException {
     long retVal;
-    if (isCompressedInput() && null != filePosition) {
+    if (isCompressedInput && null != filePosition) {
       retVal = filePosition.getPos();
     } else {
       retVal = pos;
@@ -166,9 +163,6 @@ public class LineRecordReader extends Re
     while (getFilePosition() <= end) {
       newSize = in.readLine(value, maxLineLength,
           Math.max(maxBytesToConsume(pos), maxLineLength));
-      if (newSize == 0) {
-        break;
-      }
       pos += newSize;
       if (newSize < maxLineLength) {
         break;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java Wed Apr 17 19:41:50 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
 import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.logaggregation.LogDumper;
@@ -64,8 +65,6 @@ import com.google.common.base.Charsets;
 public class CLI extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(CLI.class);
   protected Cluster cluster;
-  private final Set<String> taskTypes = new HashSet<String>(
-              Arrays.asList("map", "reduce", "setup", "cleanup"));
   private final Set<String> taskStates = new HashSet<String>(
               Arrays.asList("pending", "running", "completed", "failed", "killed"));
 
@@ -317,6 +316,7 @@ public class CLI extends Configured impl
         exitCode = 0;
       } else if (displayTasks) {
         displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
+        exitCode = 0;
       } else if(killTask) {
         TaskAttemptID taskID = TaskAttemptID.forName(taskid);
         Job job = cluster.getJob(taskID.getJobID());
@@ -563,16 +563,18 @@ public class CLI extends Configured impl
    */
   protected void displayTasks(Job job, String type, String state) 
   throws IOException, InterruptedException {
-    if (!taskTypes.contains(type)) {
-      throw new IllegalArgumentException("Invalid type: " + type + 
-          ". Valid types for task are: map, reduce, setup, cleanup.");
-    }
+
     if (!taskStates.contains(state)) {
       throw new java.lang.IllegalArgumentException("Invalid state: " + state + 
           ". Valid states for task are: pending, running, completed, failed, killed.");
     }
-
-    TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
+    TaskReport[] reports=null;
+    try{
+      reports = job.getTaskReports(TaskType.valueOf(type));
+    }catch(IllegalArgumentException e){
+      throw new IllegalArgumentException("Invalid type: " + type + 
+          ". Valid types for task are: MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP.");
+    }
     for (TaskReport report : reports) {
       TIPStatus status = report.getCurrentStatus();
       if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
@@ -626,6 +628,6 @@ public class CLI extends Configured impl
   
   public static void main(String[] argv) throws Exception {
     int res = ToolRunner.run(new CLI(), argv);
-    System.exit(res);
+    ExitUtil.terminate(res);
   }
 }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1467713-1469041

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Wed Apr 17 19:41:50 2013
@@ -154,6 +154,7 @@
         <configuration>
           <excludes>
             <exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
+            <exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java?rev=1469042&r1=1469041&r2=1469042&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java Wed Apr 17 19:41:50 2013
@@ -18,8 +18,10 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
@@ -30,35 +32,37 @@ import java.io.PrintStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 
-import org.junit.Ignore;
-import org.junit.Test;
-@Ignore
+/**
+ test CLI class. CLI class implemented  the Tool interface. 
+ Here test that CLI sends correct command with options and parameters. 
+ */
 public class TestMRJobClient extends ClusterMapReduceTestCase {
-  
+
   private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
-  
+
   private Job runJob(Configuration conf) throws Exception {
     String input = "hello1\nhello2\nhello3\n";
 
-    Job job = MapReduceTestUtil.createJob(conf,
-      getInputDir(), getOutputDir(), 1, 1, input);
+    Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+        1, 1, input);
     job.setJobName("mr");
-    job.setPriority(JobPriority.HIGH);
+    job.setPriority(JobPriority.NORMAL);
     job.waitForCompletion(true);
     return job;
   }
-  
-  public static int runTool(Configuration conf, Tool tool,
-      String[] args, OutputStream out) throws Exception {
+
+  public static int runTool(Configuration conf, Tool tool, String[] args,
+      OutputStream out) throws Exception {
     PrintStream oldOut = System.out;
     PrintStream newOut = new PrintStream(out, true);
     try {
@@ -69,20 +73,17 @@ public class TestMRJobClient extends Clu
     }
   }
 
-  private static class BadOutputFormat
-    extends TextOutputFormat {
+  private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
     @Override
-    public void checkOutputSpecs(JobContext job)
-        throws FileAlreadyExistsException, IOException {
+    public void checkOutputSpecs(JobContext job) throws IOException {
       throw new IOException();
     }
   }
-
-  @Test
+  
   public void testJobSubmissionSpecsAndFiles() throws Exception {
     Configuration conf = createJobConf();
-    Job job = MapReduceTestUtil.createJob(conf,
-          getInputDir(), getOutputDir(), 1, 1);
+    Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+        1, 1);
     job.setOutputFormatClass(BadOutputFormat.class);
     try {
       job.submit();
@@ -90,60 +91,392 @@ public class TestMRJobClient extends Clu
     } catch (Exception e) {
       assertTrue(e instanceof IOException);
     }
-    JobID jobId = job.getJobID();
     Cluster cluster = new Cluster(conf);
-    Path jobStagingArea = JobSubmissionFiles.getStagingDir(
-        cluster,
+    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
         job.getConfiguration());
-    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+    Path submitJobDir = new Path(jobStagingArea, "JobId");
     Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
-    assertFalse(
-        "Shouldn't have created a job file if job specs failed.",
-        FileSystem.get(conf).exists(submitJobFile)
-    );
+    assertFalse("Shouldn't have created a job file if job specs failed.",
+        FileSystem.get(conf).exists(submitJobFile));
   }
 
-  @Test
+  /**
+   * main test method
+   */
+
   public void testJobClient() throws Exception {
     Configuration conf = createJobConf();
     Job job = runJob(conf);
+
     String jobId = job.getJobID().toString();
-    testGetCounter(jobId, conf);
+    // test jobs list
     testJobList(jobId, conf);
+    // test job counter
+    testGetCounter(jobId, conf);
+    // status
+    testJobStatus(jobId, conf);
+    // test list of events
+    testJobEvents(jobId, conf);
+    // test job history
+    testJobHistory(conf);
+    // test tracker list
+    testListTrackers(conf);
+    // attempts list
+    testListAttemptIds(jobId, conf);
+    // black list
+    testListBlackList(conf);
+    // test method main and help screen
+    startStop();
+    // test a change job priority .
     testChangingJobPriority(jobId, conf);
+    // submit job from file
+    testSubmit(conf);
+    // kill a task
+    testKillTask(job, conf);
+    // fail a task
+    testfailTask(job, conf);
+    // kill job
+    testKillJob(jobId, conf);
+
+  }
+
+  /**
+   * test fail task
+   */
+  private void testfailTask(Job job, Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
+    TaskAttemptID taid = new TaskAttemptID(tid, 1);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    //  TaskAttemptId is not set
+    int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
+    assertEquals("Exit code", -1, exitCode);
+
+    try {
+      runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
+      fail(" this task should field");
+    } catch (YarnRemoteException e) {
+      // task completed !
+      assertTrue(e.getMessage().contains("_0001_m_000000_1"));
+    }
+  }
+  /**
+   * test a kill task
+   */ 
+  private void testKillTask(Job job, Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
+    TaskAttemptID taid = new TaskAttemptID(tid, 1);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // bad parameters
+    int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
+    assertEquals("Exit code", -1, exitCode);
+
+    try {
+      runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
+      fail(" this task should be killed");
+    } catch (YarnRemoteException e) {
+      // task completed
+      assertTrue(e.getMessage().contains("_0001_m_000000_1"));
+    }
   }
+  
+  /**
+   * test a kill job
+   */
+  private void testKillJob(String jobId, Configuration conf) throws Exception {
+    CLI jc = createJobClient();
 
-  @Test
-  public void testGetCounter(String jobId,
-      Configuration conf) throws Exception {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // without jobId
+    int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    // good parameters
+    exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
+    assertEquals("Exit code", 0, exitCode);
+    
+    String answer = new String(out.toByteArray(), "UTF-8");
+    assertTrue(answer.contains("Killed job " + jobId));
+  }
+
+  /**
+   * test submit task from file
+   */
+  private void testSubmit(Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+
+    Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
+        1, 1, "ping");
+    job.setJobName("mr");
+    job.setPriority(JobPriority.NORMAL);
+
+    File fcon = File.createTempFile("config", ".xml");
+
+    job.getConfiguration().writeXml(new FileOutputStream(fcon));
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // bad parameters
+    int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    
+    
+    exitCode = runTool(conf, jc,
+        new String[] { "-submit", "file://" + fcon.getAbsolutePath() }, out);
+    assertEquals("Exit code", 0, exitCode);
+    String answer = new String(out.toByteArray());
+    // in console was written
+    assertTrue(answer.contains("Created job "));
+  }
+  /**
+   * test start form console command without options
+   */
+  private void startStop() {
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    PrintStream error = System.err;
+    System.setErr(new PrintStream(data));
+    ExitUtil.disableSystemExit();
+    try {
+      CLI.main(new String[0]);
+      fail(" CLI.main should call System.exit");
+
+    } catch (ExitUtil.ExitException e) {
+      ExitUtil.resetFirstExitException();
+      assertEquals(-1, e.status);
+    } catch (Exception e) {
+
+    } finally {
+      System.setErr(error);
+    }
+    // in console should be written help text 
+    String s = new String(data.toByteArray());
+    assertTrue(s.contains("-submit"));
+    assertTrue(s.contains("-status"));
+    assertTrue(s.contains("-kill"));
+    assertTrue(s.contains("-set-priority"));
+    assertTrue(s.contains("-events"));
+    assertTrue(s.contains("-history"));
+    assertTrue(s.contains("-list"));
+    assertTrue(s.contains("-list-active-trackers"));
+    assertTrue(s.contains("-list-blacklisted-trackers"));
+    assertTrue(s.contains("-list-attempt-ids"));
+    assertTrue(s.contains("-kill-task"));
+    assertTrue(s.contains("-fail-task"));
+    assertTrue(s.contains("-logs"));
+
+  }
+  /**
+   * black list 
+   */
+  private void testListBlackList(Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = runTool(conf, jc, new String[] {
+        "-list-blacklisted-trackers", "second in" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
+        out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    int counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      counter++;
+    }
+    assertEquals(0, counter);
+  }
+  /**
+   * print AttemptIds list 
+   */
+  private void testListAttemptIds(String jobId, Configuration conf)
+      throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
+        "MAP", "completed" }, out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    int counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      counter++;
+    }
+    assertEquals(1, counter);
+  }
+  /**
+   * print tracker list
+   */
+  private void testListTrackers(Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
+        "second parameter" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    int counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      counter++;
+    }
+    assertEquals(2, counter);
+  }
+  /**
+   * print job history from file 
+   */
+  private void testJobHistory(Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
+    // bad command
+    int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
+        "file://" + f.getAbsolutePath() }, out);
+    assertEquals("Exit code", -1, exitCode);
+
+    exitCode = runTool(conf, jc, new String[] { "-history", "all",
+        "file://" + f.getAbsolutePath() }, out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    int counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (line.startsWith("task_")) {
+        counter++;
+      }
+    }
+    assertEquals(23, counter);
+  }
+  /**
+   * print job events list 
+   */
+  private void testJobEvents(String jobId, Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
+    assertEquals("Exit code", -1, exitCode);
+
+    exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
+        out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    int counter = 0;
+    String attemptId = ("attempt" + jobId.substring(3));
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (line.contains(attemptId)) {
+        counter++;
+      }
+    }
+    assertEquals(2, counter);
+  }
+  /**
+   * print job status 
+   */
+  private void testJobStatus(String jobId, Configuration conf) throws Exception {
+    CLI jc = createJobClient();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // bad options
+    int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
+    assertEquals("Exit code", -1, exitCode);
+
+    exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
+    assertEquals("Exit code", 0, exitCode);
+    String line;
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (!line.contains("Job state:")) {
+        continue;
+      }
+      break;
+    }
+    assertNotNull(line);
+    assertTrue(line.contains("SUCCEEDED"));
+  }
+  /**
+   * print counters 
+   */
+  public void testGetCounter(String jobId, Configuration conf) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // bad command 
     int exitCode = runTool(conf, createJobClient(),
+        new String[] { "-counter", }, out);
+    assertEquals("Exit code", -1, exitCode);
+    
+    exitCode = runTool(conf, createJobClient(),
         new String[] { "-counter", jobId,
-        "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
+            "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
         out);
     assertEquals("Exit code", 0, exitCode);
     assertEquals("Counter", "3", out.toString().trim());
   }
+  /**
+   * print a job list 
+   */
+  protected void testJobList(String jobId, Configuration conf) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    // bad options
 
-  @Test
-  public void testJobList(String jobId,
-      Configuration conf) throws Exception {
-    verifyJobPriority(jobId, "HIGH", conf, createJobClient());
-  }
+    int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
+        "alldata" }, out);
+    assertEquals("Exit code", -1, exitCode);
+    exitCode = runTool(conf, createJobClient(),
+        // all jobs
+        new String[] { "-list", "all" }, out);
+    assertEquals("Exit code", 0, exitCode);
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+        new ByteArrayInputStream(out.toByteArray())));
+    String line;
+    int counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (line.contains(jobId)) {
+        counter++;
+      }
+    }
+    assertEquals(1, counter);
+    out.reset();
+    // only submitted 
+    exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
+    assertEquals("Exit code", 0, exitCode);
+    br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
+        out.toByteArray())));
+    counter = 0;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (line.contains(jobId)) {
+        counter++;
+      }
+    }
+    // all jobs submitted! no current
+    assertEquals(1, counter);
 
+  }
+  
   protected void verifyJobPriority(String jobId, String priority,
       Configuration conf, CLI jc) throws Exception {
     PipedInputStream pis = new PipedInputStream();
     PipedOutputStream pos = new PipedOutputStream(pis);
-    int exitCode = runTool(conf, jc,
-        new String[] { "-list", "all" },
-        pos);
+    int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
     assertEquals("Exit code", 0, exitCode);
     BufferedReader br = new BufferedReader(new InputStreamReader(pis));
-    String line = null;
+    String line;
     while ((line = br.readLine()) != null) {
       LOG.info("line = " + line);
-      if (!line.startsWith(jobId)) {
+      if (!line.contains(jobId)) {
         continue;
       }
       assertTrue(line.contains(priority));
@@ -152,63 +485,16 @@ public class TestMRJobClient extends Clu
     pis.close();
   }
 
-  @Test
   public void testChangingJobPriority(String jobId, Configuration conf)
       throws Exception {
     int exitCode = runTool(conf, createJobClient(),
-        new String[] { "-set-priority", jobId, "VERY_LOW" },
-        new ByteArrayOutputStream());
+        new String[] { "-set-priority" }, new ByteArrayOutputStream());
+    assertEquals("Exit code", -1, exitCode);
+    exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
+        jobId, "VERY_LOW" }, new ByteArrayOutputStream());
     assertEquals("Exit code", 0, exitCode);
-    verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient());
-  }
-
-  @Test
-  public void testMissingProfileOutput() throws Exception {
-    Configuration conf = createJobConf();
-    final String input = "hello1\n";
-
-    // Set a job to be profiled with an empty agentlib parameter.
-    // This will fail to create profile.out files for tasks.
-    // This will succeed by skipping the HTTP fetch of the
-    // profiler output.
-    Job job = MapReduceTestUtil.createJob(conf,
-        getInputDir(), getOutputDir(), 1, 1, input);
-    job.setJobName("disable-profile-fetch");
-    job.setProfileEnabled(true);
-    job.setProfileParams("-agentlib:,verbose=n,file=%s");
-    job.setMaxMapAttempts(1);
-    job.setMaxReduceAttempts(1);
-    job.setJobSetupCleanupNeeded(false);
-    job.waitForCompletion(true);
-
-    // Run another job with an hprof agentlib param; verify
-    // that the HTTP fetch works here.
-    Job job2 = MapReduceTestUtil.createJob(conf,
-        getInputDir(), getOutputDir(), 1, 1, input);
-    job2.setJobName("enable-profile-fetch");
-    job2.setProfileEnabled(true);
-    job2.setProfileParams(
-        "-agentlib:hprof=cpu=samples,heap=sites,force=n,"
-        + "thread=y,verbose=n,file=%s");
-    job2.setProfileTaskRange(true, "0-1");
-    job2.setProfileTaskRange(false, "");
-    job2.setMaxMapAttempts(1);
-    job2.setMaxReduceAttempts(1);
-    job2.setJobSetupCleanupNeeded(false);
-    job2.waitForCompletion(true);
-
-    // Find the first map task, verify that we got its profile output file.
-    TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
-    assertTrue("No task reports found!", reports.length > 0);
-    TaskReport report = reports[0];
-    TaskID id = report.getTaskId();
-    assertTrue(TaskType.MAP == id.getTaskType());
-    System.out.println("Using task id: " + id);
-    TaskAttemptID attemptId = new TaskAttemptID(id, 0);
-
-    File profileOutFile = new File(attemptId.toString() + ".profile");
-    assertTrue("Couldn't find profiler output", profileOutFile.exists());
-    assertTrue("Couldn't remove profiler output", profileOutFile.delete());
+    // because this method does not implemented still.
+    verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
   }
 
   protected CLI createJobClient() throws IOException {