You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2016/10/10 11:52:34 UTC

[35/50] [abbrv] oozie git commit: OOZIE-2595 Make Pig action work, fix test cases

OOZIE-2595 Make Pig action work, fix test cases

Change-Id: I256d90652d116b83a5a8ced1fb23839de7e6aa70


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/11a84295
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/11a84295
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/11a84295

Branch: refs/heads/oya
Commit: 11a84295a80da0707699a52532ff5630baf99555
Parents: ca7e56f
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Mon Sep 26 14:20:04 2016 +0200
Committer: Peter Cseh <ge...@cloudera.com>
Committed: Mon Sep 26 15:25:51 2016 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 18 +++++--
 .../oozie/service/HadoopAccessorService.java    | 17 ++++---
 .../apache/oozie/action/hadoop/LauncherAM.java  |  6 +++
 .../action/hadoop/TestPigActionExecutor.java    | 52 +++++++-------------
 .../hadoop/TestMapReduceActionExecutor.java     | 20 --------
 5 files changed, 49 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 8637f64..8b5f2b0 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -242,7 +242,9 @@ public class JavaActionExecutor extends ActionExecutor {
         }
         else {
             conf = new JobConf(false);
+            // conf.set(HadoopAccessorService.OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, "true");
         }
+
         conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
         conf.set(HADOOP_YARN_RM, jobTracker);
         conf.set(HADOOP_NAME_NODE, nameNode);
@@ -1485,13 +1487,21 @@ public class JavaActionExecutor extends ActionExecutor {
                                         " action data.  Failing this action!", action.getExternalId(), action.getId());
                     }
                 }
-                String externalIDs = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
+
+                String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
+                if (externalID != null) {
+                    context.setExternalChildIDs(externalID);
+                    LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID);
+                }
+
+               // Multiple child IDs - Pig or Hive action
+                String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
                 if (externalIDs != null) {
                     context.setExternalChildIDs(externalIDs);
-                    LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
+                    LOG.info(XLog.STD, "External Child IDs  : [{0}]", externalIDs);
                 }
-                LOG.info(XLog.STD, "action completed, external ID [{0}]",
-                        action.getExternalId());
+
+                LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId());
                 context.setExecutionData(appStatus.toString(), null);
                 if (appStatus == FinalApplicationStatus.SUCCEEDED) {
                     if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 0177241..5845e17 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -86,17 +86,16 @@ public class HadoopAccessorService implements Service {
     public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
     public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
 
-    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
     /** The Kerberos principal for the job tracker.*/
     protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
     /** The Kerberos principal for the resource manager.*/
     protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
     protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
-    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
-
-    private static Configuration cachedConf;
 
+    private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
+    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
     private static final String DEFAULT_ACTIONNAME = "default";
+    private static Configuration cachedConf;
 
     private Set<String> jobTrackerWhitelist = new HashSet<String>();
     private Set<String> nameNodeWhitelist = new HashSet<String>();
@@ -564,8 +563,14 @@ public class HadoopAccessorService implements Service {
      */
     public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
             throws HadoopAccessorException {
+       return createFileSystem(user, uri, conf, true);
+    }
+
+    private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty)
+            throws HadoopAccessorException {
         ParamChecker.notEmpty(user, "user");
-        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+
+        if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
             throw new HadoopAccessorException(ErrorCode.E0903);
         }
 
@@ -750,7 +755,7 @@ public class HadoopAccessorService implements Service {
                 fos.close();
             }
         }
-        FileSystem fs = createFileSystem(user, uri, conf);
+        FileSystem fs = createFileSystem(user, uri, conf, false);
         Path dst = new Path(dir, filename);
         fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
         LocalResource localResource = Records.newRecord(LocalResource.class);

http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 43ce520..c923dda 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -191,8 +192,13 @@ public class LauncherAM {
                 }
             }
         } catch (Exception e) {
+            System.out.println("Launcher AM execution failed");
             System.err.println("Launcher AM execution failed");
+            e.printStackTrace(System.out);
             e.printStackTrace(System.err);
+            finalStatus = FinalApplicationStatus.FAILED;
+            eHolder.setErrorCause(e);
+            eHolder.setErrorMessage(e.getMessage());
             throw e;
         } finally {
             try {

http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
index 16064e7..0d0adf4 100644
--- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
+++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
@@ -18,44 +18,36 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.fs.Path;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.codehaus.jackson.JsonParser;
 import org.jdom.Element;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.JSONParser;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class TestPigActionExecutor extends ActionExecutorTestCase {
 
     private static final String PIG_SCRIPT = "set job.name 'test'\n" + "set debug on\n" +
@@ -147,18 +139,10 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
 
     private String submitAction(Context context) throws Exception {
         PigActionExecutor ae = new PigActionExecutor();
-
         WorkflowAction action = context.getAction();
-
         ae.prepareActionDir(getFileSystem(), context);
         ae.submitLauncher(getFileSystem(), context, action);
-
         String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
 
         return jobId;
     }
@@ -217,11 +201,11 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
         ae.check(context, wfAction);
         ae.end(context, wfAction);
 
-        assertEquals("SUCCEEDED", wfAction.getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, wfAction.getExternalStatus());
         String stats = wfAction.getStats();
         assertNotNull(stats);
         // check for some of the expected key values in the stats
-        Map m = (Map)JSONValue.parse(stats);
+        Map m = (Map) JSONValue.parse(stats);
         // check for expected 1st level JSON keys
         assertTrue(m.containsKey("PIG_VERSION"));
 
@@ -229,7 +213,7 @@ public class TestPigActionExecutor extends ActionExecutorTestCase {
         String[] childIDs = expectedChildIDs.split(",");
         assertTrue(m.containsKey(childIDs[0]));
 
-        Map q = (Map)m.get(childIDs[0]);
+        Map q = (Map) m.get(childIDs[0]);
         // check for expected 2nd level JSON keys
         assertTrue(q.containsKey("HADOOP_COUNTERS"));
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/11a84295/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 53330ce..39ee0bc 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -395,26 +395,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         ae.prepareActionDir(getFileSystem(), context);
         ae.submitLauncher(getFileSystem(), context, action);
 
-        String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration"))
-                .toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-        conf.set("mapreduce.framework.name", "yarn");
-
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-
-        ae.submitLauncher(getFileSystem(), context, context.getAction());
         return context.getAction().getExternalId();
     }