You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:59 UTC

[28/48] oozie git commit: OOZIE-2594 finalize the impl of MapReduceActionExecutor.kill(), adding tests

OOZIE-2594 finalize the impl of MapReduceActionExecutor.kill(), adding tests

Change-Id: I09dce58bbd3c7f4534210394e35f6681a62b9bc9


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d60f7f2
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d60f7f2
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d60f7f2

Branch: refs/heads/oya
Commit: 8d60f7f25647ff0839e62d3e245d8c6f875c57b1
Parents: 095c584
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Wed Nov 23 13:56:58 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Wed Nov 23 14:56:02 2016 +0100

----------------------------------------------------------------------
 .../action/hadoop/MapReduceActionExecutor.java  | 40 +++++++++++----
 .../oozie/action/hadoop/BlockingMapper.java     | 52 ++++++++++++++++++++
 .../action/hadoop/MapperReducerForTest.java     | 10 ++--
 .../hadoop/TestMapReduceActionExecutor.java     | 45 +++++++++++++++++
 4 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index e97de7e..11d1787 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -50,6 +50,11 @@ import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
+
 public class MapReduceActionExecutor extends JavaActionExecutor {
 
     public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
@@ -401,16 +406,15 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+    public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException {
         // Kill the LauncherAM which submits the MR job
         super.kill(context, action);
 
         // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
         // the YARN ApplicationID based on the tag and kill it as well
-
-        // TODO: this must be tested in TestMapReduceActionExecutor
+        YarnClient yarnClient = null;
         try {
-            String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action);
+            String tag = LauncherMapperHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action));
             GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
             gar.setScope(ApplicationsRequestScope.ALL);
             gar.setApplicationTags(Collections.singleton(tag));
@@ -420,16 +424,34 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
             GetApplicationsResponse apps = proxy.getApplications(gar);
             List<ApplicationReport> appsList = apps.getApplicationList();
 
-            YarnClient yarnClient = YarnClient.createYarnClient();
-            yarnClient.init(actionConf);
-            yarnClient.start();
+            if (appsList.size() > 1) {
+                String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() {
+                    @Override
+                    public String apply(ApplicationReport input) {
+                        return input.toString();
+                    }
+                }));
+
+                LOG.error("Too many applications were returned: {0}", applications);
+                throw new IllegalArgumentException("Too many applications were returned");
+            } else if (appsList.size() == 1) {
 
-            for (ApplicationReport app : appsList) {
-                LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString());
+                yarnClient = YarnClient.createYarnClient();
+                yarnClient.init(actionConf);
+                yarnClient.start();
+
+                ApplicationReport app = appsList.get(0);
+                LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(), app.getApplicationId().toString());
                 yarnClient.killApplication(app.getApplicationId());
+            } else {
+                LOG.info("No MapReduce job to kill");
             }
         } catch (Exception e) {
             throw convertException(e);
+        } finally {
+            if (yarnClient != null) {
+                Closeables.closeQuietly(yarnClient);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
new file mode 100644
index 0000000..0f4dcd6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// A mapper task that blocks forever
+public class BlockingMapper implements Mapper<Object, Object, Object, Object> {
+
+    @Override
+    public void configure(JobConf job) {
+        // nop
+    }
+
+    @Override
+    public void close() throws IOException {
+        // nop
+    }
+
+    @Override
+    public void map(Object key, Object value, OutputCollector<Object, Object> output, Reporter reporter)
+            throws IOException {
+        try {
+            synchronized (this) {
+                wait();
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
index 8f08ddd..75ac716 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.Reducer;
 import java.io.IOException;
 import java.util.Iterator;
 
-public class MapperReducerForTest implements Mapper, Reducer {
+public class MapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> {
     public static final String GROUP = "g";
     public static final String NAME = "c";
     /**
@@ -66,14 +66,14 @@ public class MapperReducerForTest implements Mapper, Reducer {
     public void close() throws IOException {
     }
 
-    @SuppressWarnings("unchecked")
-    public void map(Object key, Object value, OutputCollector collector, Reporter reporter) throws IOException {
+    @Override
+    public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException {
         collector.collect(key, value);
         reporter.incrCounter(GROUP, NAME, 5l);
     }
 
-    @SuppressWarnings("unchecked")
-    public void reduce(Object key, Iterator values, OutputCollector collector, Reporter reporter)
+    @Override
+    public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter)
             throws IOException {
         while (values.hasNext()) {
             collector.collect(key, values.next());

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index a21b7c7..78936c4 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.streaming.StreamJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
@@ -494,6 +495,12 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob));
     }
 
+    protected XConfiguration getSleepMapReduceConfig(String inputDir, String outputDir) {
+        XConfiguration conf = getMapReduceConfig(inputDir, outputDir);
+        conf.set("mapred.mapper.class", BlockingMapper.class.getName());
+        return conf;
+    }
+
     protected XConfiguration getMapReduceConfig(String inputDir, String outputDir) {
         XConfiguration conf = new XConfiguration();
         conf.set("mapred.mapper.class", MapperReducerForTest.class.getName());
@@ -654,6 +661,44 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
     }
 
+    public void testMapReduceActionKill() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml = "<map-reduce>" + "<job-tracker>" + getResourceManagerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + getSleepMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        // wait until LauncherAM terminates - the MR job keeps running the background
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        MapReduceActionExecutor mae = new MapReduceActionExecutor();
+        mae.check(context, context.getAction());  // must be called so that externalChildIDs are read from HDFS
+        JobConf conf = mae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
+        String user = conf.get("user.name");
+        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+        final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
+
+        mae.kill(context, context.getAction());
+
+        waitFor(10_000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return mrJob.isComplete();
+            }
+        });
+        assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState());
+    }
+
     public void testMapReduceWithCredentials() throws Exception {
         FileSystem fs = getFileSystem();