You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/25 11:03:22 UTC

falcon git commit: FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.

Repository: falcon
Updated Branches:
  refs/heads/master fd50217e6 -> 9da5c5643


FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9da5c564
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9da5c564
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9da5c564

Branch: refs/heads/master
Commit: 9da5c56439403601367672cb165112a873733e6f
Parents: fd50217
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 14:00:37 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 14:00:37 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/logging/DefaultTaskLogRetriever.java |  2 +-
 .../org/apache/falcon/logging/JobLogMover.java  | 47 ++++++++++++--------
 .../falcon/logging/TaskLogRetrieverYarn.java    | 11 ++++-
 4 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1054fe..f7e9127 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava)
+    
     FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava)
 
     FALCON-1409 Update API throws NullPointerException(Sandeep Samudrala via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
index 962f891..82448d8 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -61,7 +61,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
         }
     }
 
-    protected List<String> getFromHistory(String jodId) throws IOException {
+    protected List<String> getFromHistory(String jobId) throws IOException {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index ba669c8..478d68c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -91,17 +91,26 @@ public class JobLogMover {
                     }
                 }
             } else {
-                // if process wf with oozie engine
-                String subflowId = jobInfo.getExternalId();
-                copyOozieLog(client, fs, path, subflowId);
-                WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+                String flowId;
+                // if process wf with pig, hive
+                if (context.getUserWorkflowEngine().equals("pig")
+                        ||context.getUserWorkflowEngine().equals("hive")) {
+                    flowId = jobInfo.getId();
+                } else {
+                    // if process wf with oozie engine
+                    flowId = jobInfo.getExternalId();
+                }
+                copyOozieLog(client, fs, path, flowId);
+                WorkflowJob subflowInfo = client.getJobInfo(flowId);
                 List<WorkflowAction> actions = subflowInfo.getActions();
                 for (WorkflowAction action : actions) {
-                    if (action.getType().equals("pig")
-                            || action.getType().equals("java")) {
+                    if (isActionTypeSupported(action)) {
+                        LOG.info("Copying hadoop TT log for action: {} of type: {}",
+                                action.getName(), action.getType());
                         copyTTlogs(fs, path, action);
                     } else {
-                        LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+                        LOG.info("Ignoring hadoop TT log for non supported action: {} of type: {}",
+                                action.getName(), action.getType());
                     }
                 }
             }
@@ -114,8 +123,8 @@ public class JobLogMover {
     }
 
     private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
-        // userWorkflowEngine will be null for replication and "pig" for pig
-        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+        // userWorkflowEngine will be null for replication and "not null" for pig, hive, oozie
+        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) == null;
     }
 
     private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
@@ -134,7 +143,7 @@ public class JobLogMover {
             for (String ttLogURL : ttLogUrls) {
                 LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
                 InputStream in = getURLinputStream(new URL(ttLogURL));
-                OutputStream out = fs.create(new Path(path, action.getName() + "_"
+                OutputStream out = fs.create(new Path(path, action.getName() + "_" + action.getType() + "_"
                         + getMappedStatus(action.getStatus()) + "-" + index + ".log"));
                 IOUtils.copyBytes(in, out, 4096, true);
                 LOG.info("Copied log to {}", path);
@@ -143,6 +152,13 @@ public class JobLogMover {
         }
     }
 
+    private boolean isActionTypeSupported(WorkflowAction action) {
+        return action.getType().equals("pig")
+                || action.getType().equals("hive")
+                || action.getType().equals("java")
+                || action.getType().equals("map-reduce");
+    }
+
     private String getMappedStatus(WorkflowAction.Status status) {
         if (status == WorkflowAction.Status.FAILED
                 || status == WorkflowAction.Status.KILLED
@@ -161,14 +177,9 @@ public class JobLogMover {
 
     @SuppressWarnings("unchecked")
     private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration conf) {
-        try {
-            if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
-                return TaskLogRetrieverYarn.class;
-            }
-            return (Class<? extends TaskLogURLRetriever>)
-                    Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
-        } catch (ClassNotFoundException e) {
-            LOG.warn("V1 Retriever missing, falling back to Default retriever");
+        if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
+            return TaskLogRetrieverYarn.class;
+        } else {
             return DefaultTaskLogRetriever.class;
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
index 61c5afb..146d53c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
@@ -48,13 +48,22 @@ public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever {
             LOG.warn("External id for workflow action is null");
             return null;
         }
+
+        if (conf.get(YARN_LOG_SERVER_URL) == null) {
+            LOG.warn("YARN log Server is null");
+            return null;
+        }
+
         try {
             Job job = cluster.getJob(jobID);
             if (job != null) {
                 TaskCompletionEvent[] events = job.getTaskCompletionEvents(0);
                 for (TaskCompletionEvent event : events) {
                     LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId());
-                    String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/"
+                    String url = (conf.get(YARN_LOG_SERVER_URL).startsWith(SCHEME)
+                            ? conf.get(YARN_LOG_SERVER_URL)
+                            : SCHEME + conf.get(YARN_LOG_SERVER_URL))
+                            + "/"
                             + event.getTaskTrackerHttp() + "/"
                             + params.getContainerId() + "/"
                             + params.getApplicationId() + "/"