You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/25 11:03:22 UTC
falcon git commit: FALCON-1038 Log mover fails for map-reduce action.
Contributed by Peeyush Bishnoi.
Repository: falcon
Updated Branches:
refs/heads/master fd50217e6 -> 9da5c5643
FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9da5c564
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9da5c564
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9da5c564
Branch: refs/heads/master
Commit: 9da5c56439403601367672cb165112a873733e6f
Parents: fd50217
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 14:00:37 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 14:00:37 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/logging/DefaultTaskLogRetriever.java | 2 +-
.../org/apache/falcon/logging/JobLogMover.java | 47 ++++++++++++--------
.../falcon/logging/TaskLogRetrieverYarn.java | 11 ++++-
4 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1054fe..f7e9127 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@ Trunk (Unreleased)
(Suhas Vasu)
BUG FIXES
+ FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava)
+
FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava)
FALCON-1409 Update API throws NullPointerException(Sandeep Samudrala via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
index 962f891..82448d8 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -61,7 +61,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
}
}
- protected List<String> getFromHistory(String jodId) throws IOException {
+ protected List<String> getFromHistory(String jobId) throws IOException {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index ba669c8..478d68c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -91,17 +91,26 @@ public class JobLogMover {
}
}
} else {
- // if process wf with oozie engine
- String subflowId = jobInfo.getExternalId();
- copyOozieLog(client, fs, path, subflowId);
- WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+ String flowId;
+ // if process wf with pig, hive
+ if (context.getUserWorkflowEngine().equals("pig")
+ ||context.getUserWorkflowEngine().equals("hive")) {
+ flowId = jobInfo.getId();
+ } else {
+ // if process wf with oozie engine
+ flowId = jobInfo.getExternalId();
+ }
+ copyOozieLog(client, fs, path, flowId);
+ WorkflowJob subflowInfo = client.getJobInfo(flowId);
List<WorkflowAction> actions = subflowInfo.getActions();
for (WorkflowAction action : actions) {
- if (action.getType().equals("pig")
- || action.getType().equals("java")) {
+ if (isActionTypeSupported(action)) {
+ LOG.info("Copying hadoop TT log for action: {} of type: {}",
+ action.getName(), action.getType());
copyTTlogs(fs, path, action);
} else {
- LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+ LOG.info("Ignoring hadoop TT log for non supported action: {} of type: {}",
+ action.getName(), action.getType());
}
}
}
@@ -114,8 +123,8 @@ public class JobLogMover {
}
private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
- // userWorkflowEngine will be null for replication and "pig" for pig
- return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+ // userWorkflowEngine will be null for replication and "not null" for pig, hive, oozie
+ return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) == null;
}
private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
@@ -134,7 +143,7 @@ public class JobLogMover {
for (String ttLogURL : ttLogUrls) {
LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
InputStream in = getURLinputStream(new URL(ttLogURL));
- OutputStream out = fs.create(new Path(path, action.getName() + "_"
+ OutputStream out = fs.create(new Path(path, action.getName() + "_" + action.getType() + "_"
+ getMappedStatus(action.getStatus()) + "-" + index + ".log"));
IOUtils.copyBytes(in, out, 4096, true);
LOG.info("Copied log to {}", path);
@@ -143,6 +152,13 @@ public class JobLogMover {
}
}
+ private boolean isActionTypeSupported(WorkflowAction action) {
+ return action.getType().equals("pig")
+ || action.getType().equals("hive")
+ || action.getType().equals("java")
+ || action.getType().equals("map-reduce");
+ }
+
private String getMappedStatus(WorkflowAction.Status status) {
if (status == WorkflowAction.Status.FAILED
|| status == WorkflowAction.Status.KILLED
@@ -161,14 +177,9 @@ public class JobLogMover {
@SuppressWarnings("unchecked")
private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration conf) {
- try {
- if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
- return TaskLogRetrieverYarn.class;
- }
- return (Class<? extends TaskLogURLRetriever>)
- Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
- } catch (ClassNotFoundException e) {
- LOG.warn("V1 Retriever missing, falling back to Default retriever");
+ if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
+ return TaskLogRetrieverYarn.class;
+ } else {
return DefaultTaskLogRetriever.class;
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9da5c564/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
index 61c5afb..146d53c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
@@ -48,13 +48,22 @@ public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever {
LOG.warn("External id for workflow action is null");
return null;
}
+
+ if (conf.get(YARN_LOG_SERVER_URL) == null) {
+ LOG.warn("YARN log Server is null");
+ return null;
+ }
+
try {
Job job = cluster.getJob(jobID);
if (job != null) {
TaskCompletionEvent[] events = job.getTaskCompletionEvents(0);
for (TaskCompletionEvent event : events) {
LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId());
- String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/"
+ String url = (conf.get(YARN_LOG_SERVER_URL).startsWith(SCHEME)
+ ? conf.get(YARN_LOG_SERVER_URL)
+ : SCHEME + conf.get(YARN_LOG_SERVER_URL))
+ + "/"
+ event.getTaskTrackerHttp() + "/"
+ params.getContainerId() + "/"
+ params.getApplicationId() + "/"