You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/10/12 02:55:54 UTC

svn commit: r1397401 - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/action/hadoop/ core/src/test/java/org/apache/oozie/action/hadoop/ docs/src/site/twiki/

Author: virag
Date: Fri Oct 12 00:55:53 2012
New Revision: 1397401

URL: http://svn.apache.org/viewvc?rev=1397401&view=rev
Log:
OOZIE-1012 Sqoop jobs are unable to utilize Hadoop Counters (jarcec via virag)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
    oozie/trunk/docs/src/site/twiki/DG_SqoopActionExtension.twiki
    oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java?rev=1397401&r1=1397400&r2=1397401&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java Fri Oct 12 00:55:53 2012
@@ -19,22 +19,33 @@ package org.apache.oozie.action.hadoop;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.util.XLog;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
 public class SqoopActionExecutor extends JavaActionExecutor {
 
+  public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
 
     public SqoopActionExecutor() {
         super("sqoop");
@@ -107,6 +118,138 @@ public class SqoopActionExecutor extends
         return actionConf;
     }
 
+    /**
+     * We will gather counters from all executed action Hadoop jobs (e.g. jobs
+     * that moved data, not the launcher itself) and merge them together. There
+     * will be only one job most of the time. The only exception is
+     * import-all-table option that will execute one job per one exported table.
+     *
+     * @param context Action context
+     * @param action Workflow action
+     * @throws ActionExecutorException
+     */
+    @Override
+    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
+        super.end(context, action);
+        JobClient jobClient = null;
+
+        boolean exception = false;
+        try {
+            if (action.getStatus() == WorkflowAction.Status.OK) {
+                Element actionXml = XmlUtils.parseXml(action.getConf());
+                JobConf jobConf = createBaseHadoopConf(context, actionXml);
+                jobClient = createJobClient(context, jobConf);
+
+                // Cumulative counters for all Sqoop mapreduce jobs
+                Counters counters = null;
+
+                String externalIds = action.getExternalChildIDs();
+                String []jobIds = externalIds.split(",");
+
+                for(String jobId : jobIds) {
+                    RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
+                    if (runningJob == null) {
+                      throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
+                        "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
+                        .getExternalId(), action.getId());
+                    }
+
+                    Counters taskCounters = runningJob.getCounters();
+                    if(taskCounters != null) {
+                        if(counters == null) {
+                          counters = taskCounters;
+                        } else {
+                          counters.incrAllCounters(taskCounters);
+                        }
+                    } else {
+                      XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
+                    }
+                }
+
+                if (counters != null) {
+                    ActionStats stats = new MRStats(counters);
+                    String statsJsonString = stats.toJSON();
+                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
+
+                    // If action stats write property is set to false by user or
+                    // size of stats is greater than the maximum allowed size,
+                    // do not store the action stats
+                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
+                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
+                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
+                        context.setExecutionStats(statsJsonString);
+                        log.debug(
+                          "Printing stats for sqoop action as a JSON string : [{0}]" + statsJsonString);
+                    }
+                } else {
+                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
+                    XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
+                }
+            }
+        }
+        catch (Exception ex) {
+            exception = true;
+            throw convertException(ex);
+        }
+        finally {
+            if (jobClient != null) {
+                try {
+                    jobClient.close();
+                }
+                catch (Exception e) {
+                    if (exception) {
+                        log.error("JobClient error: ", e);
+                    }
+                    else {
+                        throw convertException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    // Return the value of the specified configuration property
+    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
+            throws ActionExecutorException {
+        try {
+            if (actionConf != null) {
+                Namespace ns = actionConf.getNamespace();
+                Element e = actionConf.getChild("configuration", ns);
+
+                if(e != null) {
+                  String strConf = XmlUtils.prettyPrint(e).toString();
+                  XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
+                  return inlineConf.get(key, defaultValue);
+                }
+            }
+            return defaultValue;
+        }
+        catch (IOException ex) {
+            throw convertException(ex);
+        }
+    }
+
+    /**
+     * Get the stats and external child IDs
+     *
+     * @param actionFs the FileSystem object
+     * @param runningJob the runningJob
+     * @param action the Workflow action
+     * @param context executor context
+     *
+     */
+    @Override
+    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+            throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
+        super.getActionData(actionFs, runningJob, action, context);
+
+        // Load stored Hadoop jobs ids and promote them as external child ids
+        action.getData();
+        Properties props = new Properties();
+        props.load(new StringReader(action.getData()));
+        context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
+    }
+
     @Override
     protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
         return true;

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java?rev=1397401&r1=1397400&r2=1397401&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java Fri Oct 12 00:55:53 2012
@@ -175,8 +175,14 @@ public class TestSqoopActionExecutor ext
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
         assertNotNull(context.getAction().getData());
+        assertNotNull(context.getAction().getExternalChildIDs());
         ae.end(context, context.getAction());
         assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
+
+        String hadoopCounters = context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS);
+        assertNotNull(hadoopCounters);
+        assertFalse(hadoopCounters.isEmpty());
+
         FileSystem fs = getFileSystem();
         BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(getSqoopOutputDir(), "part-m-00000"))));
         int count = 0;
@@ -216,8 +222,14 @@ public class TestSqoopActionExecutor ext
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
         assertNotNull(context.getAction().getData());
+        assertNotNull(context.getAction().getExternalChildIDs());
         ae.end(context, context.getAction());
         assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
+
+        String hadoopCounters = context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS);
+        assertNotNull(hadoopCounters);
+        assertFalse(hadoopCounters.isEmpty());
+
         FileSystem fs = getFileSystem();
         FileStatus[] parts = fs.listStatus(new Path(getSqoopOutputDir()), new PathFilter() {
             @Override

Modified: oozie/trunk/docs/src/site/twiki/DG_SqoopActionExtension.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/DG_SqoopActionExtension.twiki?rev=1397401&r1=1397400&r2=1397401&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/DG_SqoopActionExtension.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/DG_SqoopActionExtension.twiki Fri Oct 12 00:55:53 2012
@@ -170,6 +170,14 @@ The same Sqoop action using =arg= elemen
 NOTE: The =arg= elements syntax, while more verbose, allows to have spaces in a single argument, something useful when
 using free from queries.
 
+---+++ Sqoop Action Counters
+
+The counters of the map-reduce job run by the Sqoop action are available to be used in the workflow via the
+[[WorkflowFunctionalSpec#HadoopCountersEL][hadoop:counters() EL function]].
+
+If the Sqoop action run an import all command, the =hadoop:counters()= EL will return the aggregated counters
+of all map-reduce jobs run by the Sqoop import all command.
+
 ---+++ Sqoop Action Logging
 
 Sqoop action logs are redirected to the Oozie Launcher map-reduce job task STDOUT/STDERR that runs Sqoop.

Modified: oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1397401&r1=1397400&r2=1397401&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Fri Oct 12 00:55:53 2012
@@ -1688,6 +1688,7 @@ not completed yet.
 
 ---++++ 4.2.5 Hadoop EL Functions
 
+#HadoopCountersEL
 *Map < String, Map < String, Long > > hadoop:counters(String node)*
 
 It returns the counters for a job submitted by a Hadoop action node. It returns =0= if the if the Hadoop job has not

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1397401&r1=1397400&r2=1397401&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Oct 12 00:55:53 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1012 Sqoop jobs are unable to utilize Hadoop Counters (jarcec via virag)
 OOZIE-986 Oozie client shell script should use consistent naming for java options (stevenwillis via tucu)
 OOZIE-1018 Display coord job start time, end time, pause time, concurrency in job -info (mona via tucu)
 OOZIE-1016 Tests that use junit assert or fail in a new thread report success when they are actually failing (rkanter via tucu)