You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2016/10/10 11:52:34 UTC
[35/50] [abbrv] oozie git commit: OOZIE-2595 Make Pig action work,
fix test cases
OOZIE-2595 Make Pig action work, fix test cases
Change-Id: I256d90652d116b83a5a8ced1fb23839de7e6aa70
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/11a84295
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/11a84295
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/11a84295
Branch: refs/heads/oya
Commit: 11a84295a80da0707699a52532ff5630baf99555
Parents: ca7e56f
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Mon Sep 26 14:20:04 2016 +0200
Committer: Peter Cseh <ge...@cloudera.com>
Committed: Mon Sep 26 15:25:51 2016 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 18 +++++--
.../oozie/service/HadoopAccessorService.java | 17 ++++---
.../apache/oozie/action/hadoop/LauncherAM.java | 6 +++
.../action/hadoop/TestPigActionExecutor.java | 52 +++++++-------------
.../hadoop/TestMapReduceActionExecutor.java | 20 --------
5 files changed, 49 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 8637f64..8b5f2b0 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -242,7 +242,9 @@ public class JavaActionExecutor extends ActionExecutor {
}
else {
conf = new JobConf(false);
+ // conf.set(HadoopAccessorService.OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, "true");
}
+
conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
conf.set(HADOOP_YARN_RM, jobTracker);
conf.set(HADOOP_NAME_NODE, nameNode);
@@ -1485,13 +1487,21 @@ public class JavaActionExecutor extends ActionExecutor {
" action data. Failing this action!", action.getExternalId(), action.getId());
}
}
- String externalIDs = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched
+
+ String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched
+ if (externalID != null) {
+ context.setExternalChildIDs(externalID);
+ LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID);
+ }
+
+ // Multiple child IDs - Pig or Hive action
+ String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
if (externalIDs != null) {
context.setExternalChildIDs(externalIDs);
- LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
+ LOG.info(XLog.STD, "External Child IDs : [{0}]", externalIDs);
}
- LOG.info(XLog.STD, "action completed, external ID [{0}]",
- action.getExternalId());
+
+ LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId());
context.setExecutionData(appStatus.toString(), null);
if (appStatus == FinalApplicationStatus.SUCCEEDED) {
if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 0177241..5845e17 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -86,17 +86,16 @@ public class HadoopAccessorService implements Service {
public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
- protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
/** The Kerberos principal for the job tracker.*/
protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
/** The Kerberos principal for the resource manager.*/
protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
- private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
-
- private static Configuration cachedConf;
+ private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
+ private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
private static final String DEFAULT_ACTIONNAME = "default";
+ private static Configuration cachedConf;
private Set<String> jobTrackerWhitelist = new HashSet<String>();
private Set<String> nameNodeWhitelist = new HashSet<String>();
@@ -564,8 +563,14 @@ public class HadoopAccessorService implements Service {
*/
public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
throws HadoopAccessorException {
+ return createFileSystem(user, uri, conf, true);
+ }
+
+ private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty)
+ throws HadoopAccessorException {
ParamChecker.notEmpty(user, "user");
- if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+
+ if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
throw new HadoopAccessorException(ErrorCode.E0903);
}
@@ -750,7 +755,7 @@ public class HadoopAccessorService implements Service {
fos.close();
}
}
- FileSystem fs = createFileSystem(user, uri, conf);
+ FileSystem fs = createFileSystem(user, uri, conf, false);
Path dst = new Path(dir, filename);
fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
LocalResource localResource = Records.newRecord(LocalResource.class);
http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 43ce520..c923dda 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -191,8 +192,13 @@ public class LauncherAM {
}
}
} catch (Exception e) {
+ System.out.println("Launcher AM execution failed");
System.err.println("Launcher AM execution failed");
+ e.printStackTrace(System.out);
e.printStackTrace(System.err);
+ finalStatus = FinalApplicationStatus.FAILED;
+ eHolder.setErrorCause(e);
+ eHolder.setErrorMessage(e.getMessage());
throw e;
} finally {
try {
http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
index 16064e7..0d0adf4 100644
--- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
+++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
@@ -18,44 +18,36 @@
package org.apache.oozie.action.hadoop;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.fs.Path;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.codehaus.jackson.JsonParser;
import org.jdom.Element;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
public class TestPigActionExecutor extends ActionExecutorTestCase {
private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" +
@@ -147,18 +139,10 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
private String submitAction(Context context) throws Exception {
PigActionExecutor ae = new PigActionExecutor();
-
WorkflowAction action = context.getAction();
-
ae.prepareActionDir(getFileSystem(), context);
ae.submitLauncher(getFileSystem(), context, action);
-
String jobId = action.getExternalId();
- String jobTracker = action.getTrackerUri();
- String consoleUrl = action.getConsoleUrl();
- assertNotNull(jobId);
- assertNotNull(jobTracker);
- assertNotNull(consoleUrl);
return jobId;
}
@@ -217,11 +201,11 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
ae.check(context, wfAction);
ae.end(context, wfAction);
- assertEquals("SUCCEEDED", wfAction.getExternalStatus());
+ assertEquals(JavaActionExecutor.SUCCEEDED, wfAction.getExternalStatus());
String stats = wfAction.getStats();
assertNotNull(stats);
// check for some of the expected key values in the stats
- Map m = (Map)JSONValue.parse(stats);
+ Map m = (Map) JSONValue.parse(stats);
// check for expected 1st level JSON keys
assertTrue(m.containsKey("PIG_VERSION"));
@@ -229,7 +213,7 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
String[] childIDs = expectedChildIDs.split(",");
assertTrue(m.containsKey(childIDs[0]));
- Map q = (Map)m.get(childIDs[0]);
+ Map q = (Map) m.get(childIDs[0]);
// check for expected 2nd level JSON keys
assertTrue(q.containsKey("HADOOP_COUNTERS"));
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 53330ce..39ee0bc 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -395,26 +395,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
ae.prepareActionDir(getFileSystem(), context);
ae.submitLauncher(getFileSystem(), context, action);
- String jobId = action.getExternalId();
- String jobTracker = action.getTrackerUri();
- String consoleUrl = action.getConsoleUrl();
- assertNotNull(jobId);
- assertNotNull(jobTracker);
- assertNotNull(consoleUrl);
-
- Element e = XmlUtils.parseXml(action.getConf());
- XConfiguration conf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration"))
- .toString()));
- conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
- conf.set("fs.default.name", e.getChildTextTrim("name-node"));
- conf.set("user.name", context.getProtoActionConf().get("user.name"));
- conf.set("group.name", getTestGroup());
- conf.set("mapreduce.framework.name", "yarn");
-
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- XConfiguration.copy(conf, jobConf);
-
- ae.submitLauncher(getFileSystem(), context, context.getAction());
return context.getAction().getExternalId();
}