You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/09/04 12:35:20 UTC

oozie git commit: OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 9735dd38c -> 6323a8e43


OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6323a8e4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6323a8e4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6323a8e4

Branch: refs/heads/master
Commit: 6323a8e43e4d70bcad69059a5e162f48cd0ed43c
Parents: 9735dd3
Author: Andras Piros <an...@cloudera.com>
Authored: Tue Sep 4 14:35:09 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Tue Sep 4 14:35:09 2018 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 93 +++++++++++++-------
 release-log.txt                                 |  1 +
 .../oozie/action/hadoop/LauncherMain.java       | 48 +++++++---
 .../oozie/action/hadoop/TestLauncherMain.java   | 40 +++++++--
 4 files changed, 135 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 8f0f244..05fac39 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -1780,36 +1781,11 @@ public class JavaActionExecutor extends ActionExecutor {
             String launcherTag = getActionYarnTag(context, action);
             jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
             yarnClient = createYarnClient(context, jobConf);
-            if(action.getExternalId() != null) {
-                try {
-                    LOG.info("Killing action {0}'s external application {1}", action.getId(), action.getExternalId());
-                    yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
-                } catch (Exception e) {
-                    LOG.warn("Could not kill {0}", action.getExternalId(), e);
-                }
-            }
-            String externalChildIDs = action.getExternalChildIDs();
-            if(externalChildIDs != null) {
-                for(String childId : externalChildIDs.split(",")) {
-                    try {
-                        LOG.info("Killing action {0}'s external child application {1}", action.getId(), childId);
-                        yarnClient.killApplication(ConverterUtils.toApplicationId(childId.trim()));
-                    } catch (Exception e) {
-                        LOG.warn("Could not kill external child of {0}, {1}", action.getExternalId(),
-                                childId, e);
-                    }
-                }
-            }
-            for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
-                    action.getStartTime().getTime())){
-                try {
-                    LOG.info("Killing action {0}'s external child application {1} based on tags",
-                            action.getId(), id.toString());
-                    yarnClient.killApplication(id);
-                } catch (Exception e) {
-                    LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id, e);
-                }
-            }
+
+            String appExternalId = action.getExternalId();
+            killExternalApp(action, yarnClient, appExternalId);
+            killExternalChildApp(action, yarnClient, appExternalId);
+            killExternalChildAppByTags(action, yarnClient, jobConf, appExternalId);
 
             context.setExternalStatus(KILLED);
             context.setExecutionData(KILLED, null);
@@ -1828,6 +1804,63 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    private boolean finalAppStatusUndefined(ApplicationReport appReport) {
+        FinalApplicationStatus status = appReport.getFinalApplicationStatus();
+        return !FinalApplicationStatus.SUCCEEDED.equals(status) &&
+                !FinalApplicationStatus.FAILED.equals(status) &&
+                !FinalApplicationStatus.KILLED.equals(status);
+    }
+
+    void killExternalApp(WorkflowAction action, YarnClient yarnClient, String appExternalId)
+            throws YarnException, IOException {
+        if (appExternalId != null) {
+            ApplicationId appId = ConverterUtils.toApplicationId(appExternalId);
+            if (finalAppStatusUndefined(yarnClient.getApplicationReport(appId))) {
+                try {
+                    LOG.info("Killing action {0}''s external application {1}", action.getId(), appExternalId);
+                    yarnClient.killApplication(appId);
+                } catch (Exception e) {
+                    LOG.warn("Could not kill {0}", appExternalId, e);
+                }
+            }
+        }
+    }
+
+    void killExternalChildApp(WorkflowAction action, YarnClient yarnClient, String appExternalId)
+            throws YarnException, IOException {
+        String externalChildIDs = action.getExternalChildIDs();
+        if (externalChildIDs != null) {
+            for (String childId : externalChildIDs.split(",")) {
+                ApplicationId appChildId = ConverterUtils.toApplicationId(childId.trim());
+                if (finalAppStatusUndefined(yarnClient.getApplicationReport(appChildId))) {
+                    try {
+                        LOG.info("Killing action {0}''s external child application {1}", action.getId(), childId);
+                        yarnClient.killApplication(appChildId);
+                    } catch (Exception e) {
+                        LOG.warn("Could not kill external child of {0}, {1}",
+                                appExternalId, childId, e);
+                    }
+                }
+            }
+        }
+    }
+
+    void killExternalChildAppByTags(WorkflowAction action, YarnClient yarnClient, Configuration jobConf, String appExternalId)
+            throws YarnException, IOException {
+        for (ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
+                action.getStartTime().getTime())) {
+            if (finalAppStatusUndefined(yarnClient.getApplicationReport(id))) {
+                try {
+                    LOG.info("Killing action {0}''s external child application {1} based on tags",
+                            action.getId(), id.toString());
+                    yarnClient.killApplication(id);
+                } catch (Exception e) {
+                    LOG.warn("Could not kill child of {0}, {1}", appExternalId, id, e);
+                }
+            }
+        }
+    }
+
     private String getActionYarnTag(Context context, WorkflowAction action) {
         return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ce945b4..caacb1f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)
 OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros)
 OOZIE-3334 Don't use org.apache.hadoop.hbase.security.User in HDFSCredentials (gezapeti)
 OOZIE-3210 [build] Revision information is empty (asalamon74 via andras.piros)

http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index e6e182c..c9e2a91 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -31,7 +31,9 @@ import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -202,6 +205,7 @@ public abstract class LauncherMain {
             System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS);
             return childYarnJobs;
         }
+
         System.out.println("tag id : " + tag);
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
         gar.setScope(scope);
@@ -254,24 +258,44 @@ public abstract class LauncherMain {
         try {
             Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
             if (!childYarnJobs.isEmpty()) {
-                System.out.println();
-                System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher");
-                System.out.println("Killing existing applications and starting over:");
-                YarnClient yarnClient = YarnClient.createYarnClient();
-                yarnClient.init(actionConf);
-                yarnClient.start();
-                for (ApplicationId app : childYarnJobs) {
-                    System.out.print("Killing [" + app + "] ... ");
-                    yarnClient.killApplication(app);
-                    System.out.println("Done");
-                }
-                System.out.println();
+                checkAndKillChildYarnJobs(YarnClient.createYarnClient(), actionConf, childYarnJobs);
             }
         } catch (IOException | YarnException ye) {
             throw new RuntimeException("Exception occurred while killing child job(s)", ye);
         }
     }
 
+    @VisibleForTesting
+    protected static Collection<ApplicationId> checkAndKillChildYarnJobs(YarnClient yarnClient,
+                                                                         Configuration actionConf,
+                                                                         Collection<ApplicationId> childYarnJobs)
+            throws YarnException, IOException {
+
+        System.out.println();
+        System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher");
+        System.out.println("Killing existing applications and starting over:");
+        yarnClient.init(actionConf);
+        yarnClient.start();
+        Collection<ApplicationId> killedapps = new ArrayList<>();
+        for (ApplicationId app : childYarnJobs) {
+            if (finalAppStatusUndefined(yarnClient.getApplicationReport(app))) {
+                System.out.print("Killing [" + app + "] ... ");
+                yarnClient.killApplication(app);
+                System.out.println("Done");
+                killedapps.add(app);
+            }
+        }
+        System.out.println();
+        return killedapps;
+    }
+
+    private static boolean finalAppStatusUndefined(ApplicationReport appReport) {
+        FinalApplicationStatus status = appReport.getFinalApplicationStatus();
+        return !FinalApplicationStatus.SUCCEEDED.equals(status) &&
+                !FinalApplicationStatus.FAILED.equals(status) &&
+                !FinalApplicationStatus.KILLED.equals(status);
+    }
+
     protected abstract void run(String[] args) throws Exception;
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
index 5fb7cf5..b613978 100644
--- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
@@ -18,10 +18,11 @@
 
 package org.apache.oozie.action.hadoop;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.conf.Configuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -29,10 +30,16 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.rules.TemporaryFolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -110,4 +117,27 @@ public class TestLauncherMain {
         String contents = new String(Files.readAllBytes(f.toPath()));
         assertTrue(contents.contains("foo=bar"));
     }
+
+    @Test
+    public void testKillChildYarnJobs() throws Exception {
+        YarnClient yc = Mockito.mock(YarnClient.class);
+        ApplicationReport ar = Mockito.mock(ApplicationReport.class);
+        Mockito.when(yc.getApplicationReport(Mockito.any(ApplicationId.class))).thenReturn(ar);
+
+        Mockito.when(ar.getFinalApplicationStatus())
+                .thenReturn(FinalApplicationStatus.UNDEFINED)
+                .thenReturn(FinalApplicationStatus.FAILED)
+                .thenReturn(FinalApplicationStatus.KILLED);
+
+        ApplicationId appz[] = {
+                ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                ApplicationId.newInstance(System.currentTimeMillis(), 2),
+                ApplicationId.newInstance(System.currentTimeMillis(), 3)
+        };
+
+        Collection<ApplicationId> result = LauncherMain.checkAndKillChildYarnJobs(yc, null, Arrays.asList(appz));
+
+        assertEquals(1, result.size());
+        assertEquals(appz[0].getId(), result.iterator().next().getId());
+    }
 }