You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2020/10/02 15:30:26 UTC

[hadoop] branch trunk updated: MAPREDUCE-7298. Distcp doesn't close the job after the job is completed. Contributed by Aasha Medhi.

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18fa439  MAPREDUCE-7298. Distcp doesn't close the job after the job is completed. Contributed by Aasha Medhi.
18fa439 is described below

commit 18fa4397e6dc7663bcc7c7309126f45eb8d3fa17
Author: Arpit Agarwal <aa...@cloudera.com>
AuthorDate: Fri Oct 2 08:29:55 2020 -0700

    MAPREDUCE-7298. Distcp doesn't close the job after the job is completed. Contributed by Aasha Medhi.
    
    Change-Id: I63d249bbb18ccedaeee9f10123a78e32f9e54ed2
---
 .../main/java/org/apache/hadoop/tools/DistCp.java  | 15 +++++++++--
 .../org/apache/hadoop/tools/TestExternalCall.java  | 31 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index c36335a..6f8ab2b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -127,6 +127,7 @@ public class DistCp extends Configured implements Tool {
    * to target location, by:
    *  1. Creating a list of files to be copied to target.
    *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
+   *  The MR job is not closed as part of run if its a blocking call to run
    * @param argv List of arguments passed to DistCp, from the ToolRunner.
    * @return On success, it returns 0. Else, -1.
    */
@@ -148,9 +149,10 @@ public class DistCp extends Configured implements Tool {
       OptionsParser.usage();      
       return DistCpConstants.INVALID_ARGUMENT;
     }
-    
+
+    Job job = null;
     try {
-      execute();
+      job = execute();
     } catch (InvalidInputException e) {
       LOG.error("Invalid input: ", e);
       return DistCpConstants.INVALID_ARGUMENT;
@@ -166,6 +168,15 @@ public class DistCp extends Configured implements Tool {
     } catch (Exception e) {
       LOG.error("Exception encountered ", e);
       return DistCpConstants.UNKNOWN_ERROR;
+    } finally {
+      //Blocking distcp so close the job after its done
+      if (job != null && context.shouldBlock()) {
+        try {
+          job.close();
+        } catch (IOException e) {
+          LOG.error("Exception encountered while closing distcp job", e);
+        }
+      }
     }
     return DistCpConstants.SUCCESS;
   }
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java
index 06122e6..eba86a9 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.tools;
 
+import org.apache.hadoop.mapreduce.Job;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +37,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.security.Permission;
 
+import static org.mockito.Mockito.*;
+
 public class TestExternalCall {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestExternalCall.class);
@@ -134,6 +138,33 @@ public class TestExternalCall {
 
   }
 
+  /**
+   * test methods run end execute of DistCp class. distcp job should be cleaned up after completion
+   * @throws Exception
+   */
+  @Test
+  public void testCleanupOfJob() throws Exception {
+
+    Configuration conf = getConf();
+
+    Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf),
+      conf);
+    stagingDir.getFileSystem(conf).mkdirs(stagingDir);
+    Path soure = createFile("tmp.txt");
+    Path target = createFile("target.txt");
+
+    DistCp distcp = mock(DistCp.class);
+    Job job = spy(Job.class);
+    Mockito.when(distcp.getConf()).thenReturn(conf);
+    Mockito.when(distcp.execute()).thenReturn(job);
+    Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod();
+    String[] arg = { soure.toString(), target.toString() };
+
+    distcp.run(arg);
+    Mockito.verify(job, times(1)).close();
+  }
+
+
   private SecurityManager securityManager;
 
   protected static class ExitException extends SecurityException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org