You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:20:00 UTC

[29/48] oozie git commit: OOZIE-2742 Unable to kill applications based on tag (satishsaley via rohini)

OOZIE-2742 Unable to kill applications based on tag (satishsaley via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ed1e2520
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ed1e2520
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ed1e2520

Branch: refs/heads/oya
Commit: ed1e25208c57b8c15150feb2f49f57edfba7b340
Parents: 28ac958
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Wed Nov 23 09:16:01 2016 -0800
Committer: Rohini Palaniswamy <ro...@apache.org>
Committed: Wed Nov 23 09:16:01 2016 -0800

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java |  15 ++-
 .../action/hadoop/TestJavaActionExecutor.java   | 117 +++++++++----------
 .../action/hadoop/LauncherMainHadoopUtils.java  |  16 +--
 release-log.txt                                 |   1 +
 4 files changed, 79 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ed1e2520/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index f315af7..4beeb96 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -25,6 +25,7 @@ import java.net.ConnectException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
@@ -69,6 +71,7 @@ import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.ELEvaluator;
@@ -1594,7 +1597,7 @@ public class JavaActionExecutor extends ActionExecutor {
         boolean exception = false;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
-            JobConf jobConf = createBaseHadoopConf(context, actionXml);
+            final JobConf jobConf = createBaseHadoopConf(context, actionXml);
             WorkflowJob wfJob = context.getWorkflow();
             Configuration conf = null;
             if ( wfJob.getConf() != null ) {
@@ -1603,7 +1606,15 @@ public class JavaActionExecutor extends ActionExecutor {
             String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action);
             jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
             jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
-            LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
+            UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
+                    .getProxyUser(context.getWorkflow().getUser());
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
+                    return null;
+                }
+            });
             jobClient = createJobClient(context, jobConf);
             RunningJob runningJob = getRunningJob(context, action, jobClient);
             if (runningJob != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ed1e2520/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 75301db..8965cdf 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -544,69 +544,64 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
     }
 
     public void testChildKill() throws Exception {
-        if (HadoopShims.isYARN()) {
-            final JobConf clusterConf = createJobConf();
-            FileSystem fileSystem = FileSystem.get(clusterConf);
-            Path confFile = new Path("/tmp/cluster-conf.xml");
-            OutputStream out = fileSystem.create(confFile);
-            clusterConf.writeXml(out);
-            out.close();
-            String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml";
-            final String actionXml = "<java>" +
-                    "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                    "<name-node>" + getNameNodeUri() + "</name-node>" +
-                    "<main-class> " + SleepJob.class.getName() + " </main-class>" +
-                    "<arg>-mt</arg>" +
-                    "<arg>300000</arg>" +
-                    "<archive>" + confFileName + "</archive>" +
-                    "</java>";
-            final Context context = createContext(actionXml, null);
-            final RunningJob runningJob = submitAction(context);
-            waitFor(60 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return runningJob.getJobStatus().getRunState() == 1;
-                }
-            });
-            assertFalse(runningJob.isComplete());
-            Thread.sleep(15000);
-            UserGroupInformationService ugiService = Services.get().
-                    get(UserGroupInformationService.class);
-
-            UserGroupInformation ugi = ugiService.getProxyUser(getTestUser());
-            ugi.doAs(new PrivilegedExceptionAction<Object>() {
-                @Override
-                public Void run() throws Exception {
-                    JavaActionExecutor ae = new JavaActionExecutor();
-                    ae.kill(context, context.getAction());
-
-                    WorkflowJob wfJob = context.getWorkflow();
-                    Configuration conf = null;
-                    if (wfJob.getConf() != null) {
-                        conf = new XConfiguration(new StringReader(wfJob.getConf()));
-                    }
-                    String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction());
-                    Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
-                    jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
-                    jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
-                            context.getAction().getStartTime().getTime());
-                    Set<String> childSet = LauncherMainHadoopUtils.getChildJobs(jobConf);
-                    assertEquals(1, childSet.size());
-
-                    JobClient jobClient = new JobClient(clusterConf);
-                    for (String jobId : childSet) {
-                        RunningJob childJob = jobClient.getJob(jobId);
-                        assertEquals(JobStatus.State.KILLED.getValue(), childJob.getJobStatus().getRunState());
-                    }
-                    assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
-                    return null;
-                }
-            });
+        final JobConf clusterConf = createJobConf();
+        FileSystem fileSystem = FileSystem.get(clusterConf);
+        Path confFile = new Path("/tmp/cluster-conf.xml");
+        OutputStream out = fileSystem.create(confFile);
+        clusterConf.writeXml(out);
+        out.close();
+        String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml";
+        final String actionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class> " + SleepJob.class.getName() + " </main-class>" +
+                "<arg>-mt</arg>" +
+                "<arg>300000</arg>" +
+                "<archive>" + confFileName + "</archive>" +
+                "</java>";
+        final Context context = createContext(actionXml, null);
+        final RunningJob runningJob = submitAction(context);
+        waitFor(60 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return runningJob.getJobStatus().getRunState() == 1;
+            }
+        });
+        assertFalse(runningJob.isComplete());
+        Thread.sleep(15000);
+        JavaActionExecutor ae = new JavaActionExecutor();
+        ae.kill(context, context.getAction());
 
-            assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus());
-            assertEquals("KILLED", context.getAction().getExternalStatus());
-            assertFalse(runningJob.isSuccessful());
+        WorkflowJob wfJob = context.getWorkflow();
+        Configuration conf = null;
+        if (wfJob.getConf() != null) {
+            conf = new XConfiguration(new StringReader(wfJob.getConf()));
         }
+        String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction());
+        final Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
+        jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
+        jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, context.getAction().getStartTime().getTime());
+
+        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
+        UserGroupInformation ugi = ugiService.getProxyUser(getTestUser());
+        Set<String> childSet = ugi.doAs(new PrivilegedExceptionAction<Set<String>>() {
+            @Override
+            public Set<String> run() throws Exception {
+                Set<String> childSet = LauncherMainHadoopUtils.getChildJobs(jobConf);
+                return childSet;
+            }
+        });
+        assertEquals(1, childSet.size());
+
+        JobClient jobClient = new JobClient(clusterConf);
+        for (String jobId : childSet) {
+            RunningJob childJob = jobClient.getJob(jobId);
+            assertEquals(JobStatus.State.KILLED.getValue(), childJob.getJobStatus().getRunState());
+        }
+        assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
+        assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus());
+        assertEquals("KILLED", context.getAction().getExternalStatus());
+        assertFalse(runningJob.isSuccessful());
     }
 
         public void testExceptionSubmitException() throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ed1e2520/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 5fda0ef..0cf2e90 100644
--- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -23,6 +23,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@@ -53,13 +56,12 @@ public class LauncherMainHadoopUtils {
         System.out.println("tag id : " + tag);
         long startTime = 0L;
         try {
-            try {
-                if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) {
-                    startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME));
-                }
-                else {
-                    startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
-                }
+            if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) {
+                startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME));
+            }
+            else {
+                startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+            }
         } catch(NumberFormatException nfe) {
             throw new RuntimeException("Could not find Oozie job launch time", nfe);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ed1e2520/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 52640dd..bd7219c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -11,6 +11,7 @@ OOZIE-2634 Queue dump command message is confusing when the queue is empty (andr
 
 -- Oozie 4.3.0 release
 
+OOZIE-2742 Unable to kill applications based on tag (satishsaley via rohini)
 OOZIE-2720 Test failure - TestCoordMaterializeTriggerService#testMaxMatThrottleNotPicked (gezapeti via rohini)
 OOZIE-2536 Hadoop's cleanup of local directory in uber mode causing failures (satishsaley via rohini)
 OOZIE-2723 JSON.org license is now CatX (rkanter, abhishekbafna via shwethags)