You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:59 UTC
[28/48] oozie git commit: OOZIE-2594 finalize the impl of
MapReduceActionExecutor.kill(), adding tests
OOZIE-2594 finalize the impl of MapReduceActionExecutor.kill(), adding tests
Change-Id: I09dce58bbd3c7f4534210394e35f6681a62b9bc9
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d60f7f2
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d60f7f2
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d60f7f2
Branch: refs/heads/oya
Commit: 8d60f7f25647ff0839e62d3e245d8c6f875c57b1
Parents: 095c584
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Wed Nov 23 13:56:58 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Wed Nov 23 14:56:02 2016 +0100
----------------------------------------------------------------------
.../action/hadoop/MapReduceActionExecutor.java | 40 +++++++++++----
.../oozie/action/hadoop/BlockingMapper.java | 52 ++++++++++++++++++++
.../action/hadoop/MapperReducerForTest.java | 10 ++--
.../hadoop/TestMapReduceActionExecutor.java | 45 +++++++++++++++++
4 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index e97de7e..11d1787 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -50,6 +50,11 @@ import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
+
public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
@@ -401,16 +406,15 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
@Override
- public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+ public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException {
// Kill the LauncherAM which submits the MR job
super.kill(context, action);
// We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
// the YARN ApplicationID based on the tag and kill it as well
-
- // TODO: this must be tested in TestMapReduceActionExecutor
+ YarnClient yarnClient = null;
try {
- String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action);
+ String tag = LauncherMapperHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action));
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(ApplicationsRequestScope.ALL);
gar.setApplicationTags(Collections.singleton(tag));
@@ -420,16 +424,34 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
GetApplicationsResponse apps = proxy.getApplications(gar);
List<ApplicationReport> appsList = apps.getApplicationList();
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(actionConf);
- yarnClient.start();
+ if (appsList.size() > 1) {
+ String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() {
+ @Override
+ public String apply(ApplicationReport input) {
+ return input.toString();
+ }
+ }));
+
+ LOG.error("Too many applications were returned: {0}", applications);
+ throw new IllegalArgumentException("Too many applications were returned");
+ } else if (appsList.size() == 1) {
- for (ApplicationReport app : appsList) {
- LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString());
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(actionConf);
+ yarnClient.start();
+
+ ApplicationReport app = appsList.get(0);
+ LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(), app.getApplicationId().toString());
yarnClient.killApplication(app.getApplicationId());
+ } else {
+ LOG.info("No MapReduce job to kill");
}
} catch (Exception e) {
throw convertException(e);
+ } finally {
+ if (yarnClient != null) {
+ Closeables.closeQuietly(yarnClient);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
new file mode 100644
index 0000000..0f4dcd6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// A mapper task that blocks forever
+public class BlockingMapper implements Mapper<Object, Object, Object, Object> {
+
+ @Override
+ public void configure(JobConf job) {
+ // nop
+ }
+
+ @Override
+ public void close() throws IOException {
+ // nop
+ }
+
+ @Override
+ public void map(Object key, Object value, OutputCollector<Object, Object> output, Reporter reporter)
+ throws IOException {
+ try {
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
index 8f08ddd..75ac716 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.Reducer;
import java.io.IOException;
import java.util.Iterator;
-public class MapperReducerForTest implements Mapper, Reducer {
+public class MapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> {
public static final String GROUP = "g";
public static final String NAME = "c";
/**
@@ -66,14 +66,14 @@ public class MapperReducerForTest implements Mapper, Reducer {
public void close() throws IOException {
}
- @SuppressWarnings("unchecked")
- public void map(Object key, Object value, OutputCollector collector, Reporter reporter) throws IOException {
+ @Override
+ public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException {
collector.collect(key, value);
reporter.incrCounter(GROUP, NAME, 5l);
}
- @SuppressWarnings("unchecked")
- public void reduce(Object key, Iterator values, OutputCollector collector, Reporter reporter)
+ @Override
+ public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter)
throws IOException {
while (values.hasNext()) {
collector.collect(key, values.next());
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index a21b7c7..78936c4 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.streaming.StreamJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
@@ -494,6 +495,12 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob));
}
+ protected XConfiguration getSleepMapReduceConfig(String inputDir, String outputDir) {
+ XConfiguration conf = getMapReduceConfig(inputDir, outputDir);
+ conf.set("mapred.mapper.class", BlockingMapper.class.getName());
+ return conf;
+ }
+
protected XConfiguration getMapReduceConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.mapper.class", MapperReducerForTest.class.getName());
@@ -654,6 +661,44 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName()));
}
+ public void testMapReduceActionKill() throws Exception {
+ FileSystem fs = getFileSystem();
+
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+
+ String actionXml = "<map-reduce>" + "<job-tracker>" + getResourceManagerUri() + "</job-tracker>" + "<name-node>"
+ + getNameNodeUri() + "</name-node>"
+ + getSleepMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+
+ Context context = createContext(MAP_REDUCE, actionXml);
+ final String launcherId = submitAction(context);
+ // wait until LauncherAM terminates - the MR job keeps running the background
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+ MapReduceActionExecutor mae = new MapReduceActionExecutor();
+ mae.check(context, context.getAction()); // must be called so that externalChildIDs are read from HDFS
+ JobConf conf = mae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
+ String user = conf.get("user.name");
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
+
+ mae.kill(context, context.getAction());
+
+ waitFor(10_000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState());
+ }
+
public void testMapReduceWithCredentials() throws Exception {
FileSystem fs = getFileSystem();