You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/05/14 07:52:11 UTC

svn commit: r1482208 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/had...

Author: vinodkv
Date: Tue May 14 05:52:11 2013
New Revision: 1482208

URL: http://svn.apache.org/r1482208
Log:
MAPREDUCE-5222. Bring back some methods and constants in Jobclient for binary compatibility with mapred in 1.x. Contributed by Karthik Kambatla.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1482208&r1=1482207&r2=1482208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue May 14 05:52:11 2013
@@ -230,6 +230,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5157. Bring back old sampler related code so that we can support
     binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
 
+    MAPREDUCE-5222. Bring back some methods and constants in Jobclient for
+    binary compatibility with mapred in 1.x. (Karthik Kambatla via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java?rev=1482208&r1=1482207&r2=1482208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java Tue May 14 05:52:11 2013
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -27,6 +31,9 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class TestJobClient {
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "/tmp")).getAbsolutePath();
+
   @Test
   public void testGetClusterStatusWithLocalJobRunner() throws Exception {
     Configuration conf = new Configuration();
@@ -43,4 +50,32 @@ public class TestJobClient {
         .getBlackListedTrackersInfo();
     Assert.assertEquals(0, blackListedTrackersInfo.size());
   }
+
+  @Test(timeout = 1000)
+  public void testIsJobDirValid() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path testDir = new Path(TEST_DIR);
+    Assert.assertFalse(JobClient.isJobDirValid(testDir, fs));
+
+    Path jobconf = new Path(testDir, "job.xml");
+    Path jobsplit = new Path(testDir, "job.split");
+    fs.create(jobconf);
+    fs.create(jobsplit);
+    Assert.assertTrue(JobClient.isJobDirValid(testDir, fs));
+    
+    fs.delete(jobconf, true);
+    fs.delete(jobsplit, true);
+  }
+  
+  @Test(timeout = 1000)
+  public void testGetStagingAreaDir() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    JobClient client = new JobClient(conf);
+
+    Assert.assertTrue(
+        "Mismatch in paths",
+        client.getClusterHandle().getStagingAreaDir().toString()
+            .equals(client.getStagingAreaDir().toString()));
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1482208&r1=1482207&r2=1482208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Tue May 14 05:52:11 2013
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -136,6 +137,20 @@ import org.apache.hadoop.util.ToolRunner
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class JobClient extends CLI {
+
+  @InterfaceAudience.Private
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "mapreduce.jobclient.retry.policy.enabled";
+  @InterfaceAudience.Private
+  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
+      false;
+  @InterfaceAudience.Private
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "mapreduce.jobclient.retry.policy.spec";
+  @InterfaceAudience.Private
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "10000,6,60000,10"; // t1,n1,t2,n2,...
+
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
   
@@ -525,6 +540,12 @@ public class JobClient extends CLI {
    */
   public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
                                                   IOException {
+    return submitJobInternal(conf);
+  }
+
+  @InterfaceAudience.Private
+  public RunningJob submitJobInternal(final JobConf conf)
+      throws FileNotFoundException, IOException {
     try {
       conf.setBooleanIfUnset("mapred.mapper.new-api", false);
       conf.setBooleanIfUnset("mapred.reducer.new-api", false);
@@ -958,6 +979,50 @@ public class JobClient extends CLI {
     }
   }
 
+  /**
+   * Checks if the job directory is clean and has all the required components
+   * for (re) starting the job
+   */
+  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
+      throws IOException {
+    FileStatus[] contents = fs.listStatus(jobDirPath);
+    int matchCount = 0;
+    if (contents != null && contents.length >= 2) {
+      for (FileStatus status : contents) {
+        if ("job.xml".equals(status.getPath().getName())) {
+          ++matchCount;
+        }
+        if ("job.split".equals(status.getPath().getName())) {
+          ++matchCount;
+        }
+      }
+      if (matchCount == 2) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Fetch the staging area directory for the application
+   * 
+   * @return path to staging area directory
+   * @throws IOException
+   */
+  public Path getStagingAreaDir() throws IOException {
+    try {
+      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
+        @Override
+        public Path run() throws IOException, InterruptedException {
+          return cluster.getStagingAreaDir();
+        }
+      });
+    } catch (InterruptedException ie) {
+      // throw RuntimeException instead for compatibility reasons
+      throw new RuntimeException(ie);
+    }
+  }
+
   private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
     JobQueueInfo ret = new JobQueueInfo(queue);
     // make sure to convert any children