You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:37 UTC
[4/8] git commit: FALCON-486 - Introduce Workflow Context in Post
Processing. Contributed by Venkatesh Seetharam
FALCON-486 - Introduce Workflow Context in Post Processing. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c6000363
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c6000363
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c6000363
Branch: refs/heads/master
Commit: c6000363a19e11745d9f3f7abc14e03f08be7f88
Parents: 23762e5
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 12:58:23 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 12:58:23 2014 -0700
----------------------------------------------------------------------
.../org/apache/falcon/metadata/LineageArgs.java | 1 +
.../apache/falcon/metadata/LineageRecorder.java | 1 +
.../falcon/messaging/JMSMessageProducer.java | 57 ++---
.../org/apache/falcon/logging/JobLogMover.java | 170 ++++++++++++++
.../org/apache/falcon/logging/LogMover.java | 229 -------------------
.../falcon/workflow/FalconPostProcessing.java | 227 +++---------------
.../workflow/FalconPostProcessingTest.java | 78 +++----
.../org/apache/falcon/process/PigProcessIT.java | 2 +-
.../org/apache/falcon/util/OozieTestUtils.java | 15 +-
9 files changed, 268 insertions(+), 512 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
index 7adf5bb..e91f7ab 100644
--- a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
+++ b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
/**
* Args for data Lineage.
*/
+@Deprecated // delete this class
public enum LineageArgs {
// process instance
NOMINAL_TIME("nominalTime", "instance time"),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
index 8a946ad..5ab1bf0 100644
--- a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
@@ -46,6 +46,7 @@ import java.util.Map;
/**
* Utility called in the post process of oozie workflow to record lineage information.
*/
+@Deprecated // delete this class
public class LineageRecorder extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(LineageRecorder.class);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 6c9859c..aeadf5c 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -63,35 +63,6 @@ public class JMSMessageProducer {
private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
- private static final WorkflowExecutionArgs[] FALCON_FILTER = {
- WorkflowExecutionArgs.NOMINAL_TIME,
- WorkflowExecutionArgs.ENTITY_NAME,
- WorkflowExecutionArgs.OPERATION,
- WorkflowExecutionArgs.LOG_DIR,
- WorkflowExecutionArgs.STATUS,
- WorkflowExecutionArgs.CONTEXT_FILE,
- WorkflowExecutionArgs.TIMESTAMP,
- };
-
- private static final WorkflowExecutionArgs[] USER_FILTER = {
- WorkflowExecutionArgs.CLUSTER_NAME,
- WorkflowExecutionArgs.ENTITY_NAME,
- WorkflowExecutionArgs.ENTITY_TYPE,
- WorkflowExecutionArgs.NOMINAL_TIME,
- WorkflowExecutionArgs.OPERATION,
-
- WorkflowExecutionArgs.FEED_NAMES,
- WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
-
- WorkflowExecutionArgs.WORKFLOW_ID,
- WorkflowExecutionArgs.WORKFLOW_USER,
- WorkflowExecutionArgs.RUN_ID,
- WorkflowExecutionArgs.STATUS,
- WorkflowExecutionArgs.TIMESTAMP,
- WorkflowExecutionArgs.LOG_FILE,
- };
-
-
private final WorkflowExecutionContext context;
private final MessageType messageType;
@@ -170,9 +141,26 @@ public class JMSMessageProducer {
* Accepts a Message to be send to JMS topic, creates a new
* Topic based on topic name if it does not exist or else
* existing topic with the same name is used to send the message.
+ * Sends all arguments.
+ *
+ * @return error code
+ * @throws JMSException
*/
public int sendMessage() throws JMSException {
- List<Map<String, String>> messageList = buildMessageList();
+ return sendMessage(WorkflowExecutionArgs.values());
+ }
+
+ /**
+ * Accepts a Message to be send to JMS topic, creates a new
+ * Topic based on topic name if it does not exist or else
+ * existing topic with the same name is used to send the message.
+ *
+ * @param filteredArgs args sent in the message.
+ * @return error code
+ * @throws JMSException
+ */
+ public int sendMessage(WorkflowExecutionArgs[] filteredArgs) throws JMSException {
+ List<Map<String, String>> messageList = buildMessageList(filteredArgs);
if (messageList.isEmpty()) {
LOG.warn("No operation on output feed");
@@ -198,10 +186,7 @@ public class JMSMessageProducer {
return 0;
}
- private List<Map<String, String>> buildMessageList() {
- WorkflowExecutionArgs[] fileredArgs = messageType == MessageType.FALCON
- ? FALCON_FILTER : USER_FILTER;
-
+ private List<Map<String, String>> buildMessageList(WorkflowExecutionArgs[] filteredArgs) {
String[] feedNames = getFeedNames();
if (feedNames == null) {
return Collections.emptyList();
@@ -217,7 +202,7 @@ public class JMSMessageProducer {
List<Map<String, String>> messages = new ArrayList<Map<String, String>>(feedPaths.length);
for (int i = 0; i < feedPaths.length; i++) {
- Map<String, String> message = buildMessage(fileredArgs);
+ Map<String, String> message = buildMessage(filteredArgs);
// override default values
if (context.getEntityType().equalsIgnoreCase("PROCESS")) {
@@ -347,7 +332,7 @@ public class JMSMessageProducer {
@SuppressWarnings("unchecked")
private Connection createAndStartConnection(String implementation, String userName,
- String password, String url)
+ String password, String url)
throws JMSException, ClassNotFoundException, InstantiationException,
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
new file mode 100644
index 0000000..1a08ada
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.logging;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utility called in the post process of oozie workflow to move oozie action executor log.
+ */
+public class JobLogMover {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobLogMover.class);
+
+ public static final Set<String> FALCON_ACTIONS =
+ new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
+
+ private Configuration getConf() {
+ return new Configuration();
+ }
+
+ public int run(WorkflowExecutionContext context) {
+ try {
+ OozieClient client = new OozieClient(context.getWorkflowEngineUrl());
+ WorkflowJob jobInfo;
+ try {
+ jobInfo = client.getJobInfo(context.getUserSubflowId());
+ } catch (OozieClientException e) {
+ LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e);
+ return 0;
+ }
+
+ Path path = new Path(context.getLogDir() + "/"
+ + String.format("%03d", context.getWorkflowRunId()));
+ FileSystem fs = path.getFileSystem(getConf());
+
+ if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
+ || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {
+ // if replication wf, retention wf or PIG Process
+ copyOozieLog(client, fs, path, jobInfo.getId());
+
+ List<WorkflowAction> workflowActions = jobInfo.getActions();
+ for (int i=0; i < workflowActions.size(); i++) {
+ if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
+ copyTTlogs(fs, path, jobInfo.getActions().get(i));
+ break;
+ }
+ }
+ } else {
+ // if process wf with oozie engine
+ String subflowId = jobInfo.getExternalId();
+ copyOozieLog(client, fs, path, subflowId);
+ WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+ List<WorkflowAction> actions = subflowInfo.getActions();
+ for (WorkflowAction action : actions) {
+ if (action.getType().equals("pig")
+ || action.getType().equals("java")) {
+ copyTTlogs(fs, path, action);
+ } else {
+ LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
+ LOG.error("Exception in log mover:", e);
+ }
+ return 0;
+ }
+
+ private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
+ // userWorkflowEngine will be null for replication and "pig" for pig
+ return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+ }
+
+ private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
+ String id) throws OozieClientException, IOException {
+ InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
+ OutputStream out = fs.create(new Path(path, "oozie.log"));
+ IOUtils.copyBytes(in, out, 4096, true);
+ LOG.info("Copied oozie log to {}", path);
+ }
+
+ private void copyTTlogs(FileSystem fs, Path path,
+ WorkflowAction action) throws Exception {
+ String ttLogURL = getTTlogURL(action.getExternalId());
+ if (ttLogURL != null) {
+ LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
+ InputStream in = getURLinputStream(new URL(ttLogURL));
+ OutputStream out = fs.create(new Path(path, action.getName() + "_"
+ + getMappedStatus(action.getStatus()) + ".log"));
+ IOUtils.copyBytes(in, out, 4096, true);
+ LOG.info("Copied log to {}", path);
+ }
+ }
+
+ private String getMappedStatus(WorkflowAction.Status status) {
+ if (status == WorkflowAction.Status.FAILED
+ || status == WorkflowAction.Status.KILLED
+ || status == WorkflowAction.Status.ERROR) {
+ return "FAILED";
+ } else {
+ return "SUCCEEDED";
+ }
+ }
+
+ private String getTTlogURL(String jobId) throws Exception {
+ TaskLogURLRetriever logRetriever = ReflectionUtils
+ .newInstance(getLogRetrieverClassName(), getConf());
+ return logRetriever.retrieveTaskLogURL(jobId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
+ try {
+ return (Class<? extends TaskLogURLRetriever>)
+ Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
+ } catch (ClassNotFoundException e) {
+ LOG.warn("V1 Retriever missing, falling back to Default retriever");
+ return DefaultTaskLogRetriever.class;
+ }
+ }
+
+ private InputStream getURLinputStream(URL url) throws IOException {
+ URLConnection connection = url.openConnection();
+ connection.setDoOutput(true);
+ connection.connect();
+ return connection.getInputStream();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
deleted file mode 100644
index 3922b38..0000000
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Utility called in the post process of oozie workflow to move oozie action executor log.
- */
-public class LogMover extends Configured implements Tool {
-
- private static final Logger LOG = LoggerFactory.getLogger(LogMover.class);
- public static final Set<String> FALCON_ACTIONS =
- new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
-
- /**
- * Args to the command.
- */
- private static class ARGS {
- private String oozieUrl;
- private String subflowId;
- private String runId;
- private String logDir;
- private String entityType;
- private String userWorkflowEngine;
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new LogMover(), args);
- }
-
- @Override
- public int run(String[] arguments) throws Exception {
- try {
- ARGS args = new ARGS();
- setupArgs(arguments, args);
-
- OozieClient client = new OozieClient(args.oozieUrl);
- WorkflowJob jobInfo;
- try {
- jobInfo = client.getJobInfo(args.subflowId);
- } catch (OozieClientException e) {
- LOG.error("Error getting jobinfo for: {}", args.subflowId, e);
- return 0;
- }
-
- Path path = new Path(args.logDir + "/"
- + String.format("%03d", Integer.parseInt(args.runId)));
- FileSystem fs = path.getFileSystem(getConf());
-
- if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
- || notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
- // if replication wf, retention wf or PIG Process
- copyOozieLog(client, fs, path, jobInfo.getId());
-
- List<WorkflowAction> workflowActions = jobInfo.getActions();
- for (int i=0; i < workflowActions.size(); i++) {
- if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
- copyTTlogs(fs, path, jobInfo.getActions().get(i));
- break;
- }
- }
- } else {
- // if process wf with oozie engine
- String subflowId = jobInfo.getExternalId();
- copyOozieLog(client, fs, path, subflowId);
- WorkflowJob subflowInfo = client.getJobInfo(subflowId);
- List<WorkflowAction> actions = subflowInfo.getActions();
- for (WorkflowAction action : actions) {
- if (action.getType().equals("pig")
- || action.getType().equals("java")) {
- copyTTlogs(fs, path, action);
- } else {
- LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
- }
- }
- }
-
- } catch (Exception e) {
- LOG.error("Exception in log mover", e);
- }
- return 0;
- }
-
- private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
- // userWorkflowEngine will be null for replication and "pig" for pig
- return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
- }
-
- private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
- String id) throws OozieClientException, IOException {
- InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
- OutputStream out = fs.create(new Path(path, "oozie.log"));
- IOUtils.copyBytes(in, out, 4096, true);
- LOG.info("Copied oozie log to {}", path);
- }
-
- private void copyTTlogs(FileSystem fs, Path path,
- WorkflowAction action) throws Exception {
- String ttLogURL = getTTlogURL(action.getExternalId());
- if (ttLogURL != null) {
- LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
- InputStream in = getURLinputStream(new URL(ttLogURL));
- OutputStream out = fs.create(new Path(path, action.getName() + "_"
- + getMappedStatus(action.getStatus()) + ".log"));
- IOUtils.copyBytes(in, out, 4096, true);
- LOG.info("Copied log to {}", path);
- }
- }
-
- private String getMappedStatus(WorkflowAction.Status status) {
- if (status == WorkflowAction.Status.FAILED
- || status == WorkflowAction.Status.KILLED
- || status == WorkflowAction.Status.ERROR) {
- return "FAILED";
- } else {
- return "SUCCEEDED";
- }
- }
-
- private void setupArgs(String[] arguments, ARGS args) throws ParseException {
- Options options = new Options();
-
- Option opt = new Option("workflowEngineUrl", true, "url of workflow engine, ex:oozie");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("subflowId", true, "external id of userworkflow");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("userWorkflowEngine", true, "user workflow engine type");
- opt.setRequired(false); // replication will NOT have this arg sent
- options.addOption(opt);
-
- opt = new Option("runId", true, "current workflow's runid");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("logDir", true, "log dir where job logs are stored");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("status", true, "user workflow status");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("entityType", true, "entity type feed or process");
- opt.setRequired(true);
- options.addOption(opt);
-
- CommandLine cmd = new GnuParser().parse(options, arguments);
-
- args.oozieUrl = cmd.getOptionValue("workflowEngineUrl");
- args.subflowId = cmd.getOptionValue("subflowId");
- args.userWorkflowEngine = cmd.getOptionValue("userWorkflowEngine");
- args.runId = cmd.getOptionValue("runId");
- args.logDir = cmd.getOptionValue("logDir");
- args.entityType = cmd.getOptionValue("entityType");
- }
-
- private String getTTlogURL(String jobId) throws Exception {
- TaskLogURLRetriever logRetriever = ReflectionUtils.newInstance(getLogRetrieverClassName(), getConf());
- return logRetriever.retrieveTaskLogURL(jobId);
- }
-
- @SuppressWarnings("unchecked")
- private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
- try {
- return (Class<? extends TaskLogURLRetriever>)
- Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
- } catch (ClassNotFoundException e) {
- LOG.warn("V1 Retriever missing, falling back to Default retriever");
- return DefaultTaskLogRetriever.class;
- }
- }
-
- private InputStream getURLinputStream(URL url) throws IOException {
- URLConnection connection = url.openConnection();
- connection.setDoOutput(true);
- connection.connect();
- return connection.getInputStream();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index d3befa2..0adc11b 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -17,12 +17,8 @@
*/
package org.apache.falcon.workflow;
-import org.apache.commons.cli.*;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.logging.LogMover;
-import org.apache.falcon.messaging.MessageProducer;
-import org.apache.falcon.metadata.LineageArgs;
-import org.apache.falcon.metadata.LineageRecorder;
+import org.apache.falcon.logging.JobLogMover;
+import org.apache.falcon.messaging.JMSMessageProducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,68 +27,12 @@ import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Utility called by oozie workflow engine post workflow execution in parent workflow.
*/
public class FalconPostProcessing extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(FalconPostProcessing.class);
- /**
- * Args that the utility understands.
- */
- public enum Arg {
- CLUSTER("cluster", "name of the current cluster"),
- ENTITY_TYPE("entityType", "type of the entity"),
- ENTITY_NAME("entityName", "name of the entity"),
- NOMINAL_TIME("nominalTime", "instance time"),
- OPERATION("operation", "operation like generate, delete, replicate"),
- WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
- RUN_ID("runId", "current run-id of the instance"),
- STATUS("status", "status of the user workflow instance"),
- TIMESTAMP("timeStamp", "current timestamp"),
- TOPIC_NAME("topicName", "name of the topic to be used to send JMS message"),
- BRKR_IMPL_CLASS("brokerImplClass", "falcon message broker Implementation class"),
- BRKR_URL("brokerUrl", "falcon message broker url"),
- USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class"),
- USER_BRKR_URL("userBrokerUrl", "user broker url"),
- BRKR_TTL("brokerTTL", "time to live for broker message in sec"),
- FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
- FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
- LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
- WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
- USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
- USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
- USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"),
- USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"),
- LOG_DIR("logDir", "log dir where job logs are copied"),
- WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
- INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs"),
- INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths");
-
- private String name;
- private String description;
-
- Arg(String name, String description) {
- this.name = name;
- this.description = description;
- }
-
- public Option getOption() {
- return new Option(this.name, true, this.description);
- }
-
- public String getOptionName() {
- return this.name;
- }
-
- public String getOptionValue(CommandLine cmd) {
- return cmd.getOptionValue(this.name);
- }
- }
-
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new FalconPostProcessing(), args);
}
@@ -100,161 +40,46 @@ public class FalconPostProcessing extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- CommandLine cmd = getCommand(args);
-
- LOG.info("Sending user message {}", cmd);
- invokeUserMessageProducer(cmd);
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(args,
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ LOG.info("Post workflow execution context created {}", context);
+ // serialize the context to HDFS under logs dir before sending the message
+ context.serialize();
- if ("SUCCEEDED".equals(Arg.STATUS.getOptionValue(cmd))) {
- LOG.info("Recording lineage for {}", cmd);
- recordLineageMetadata(cmd);
- }
+ LOG.info("Sending user message {} ", context);
+ invokeUserMessageProducer(context);
- //LogMover doesn't throw exception, a failed log mover will not fail the user workflow
- LOG.info("Moving logs {}", cmd);
- invokeLogProducer(cmd);
+ // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
+ LOG.info("Moving logs {}", context);
+ invokeLogProducer(context);
- LOG.info("Sending falcon message {}", cmd);
- invokeFalconMessageProducer(cmd);
+ LOG.info("Sending falcon message {}", context);
+ invokeFalconMessageProducer(context);
return 0;
}
- private void invokeUserMessageProducer(CommandLine cmd) throws Exception {
- List<String> args = new ArrayList<String>();
- addArg(args, cmd, Arg.CLUSTER);
- addArg(args, cmd, Arg.ENTITY_TYPE);
- addArg(args, cmd, Arg.ENTITY_NAME);
- addArg(args, cmd, Arg.NOMINAL_TIME);
- addArg(args, cmd, Arg.OPERATION);
- addArg(args, cmd, Arg.WORKFLOW_ID);
- addArg(args, cmd, Arg.RUN_ID);
- addArg(args, cmd, Arg.STATUS);
- addArg(args, cmd, Arg.TIMESTAMP);
- //special args for user JMS message producer
- args.add("-" + Arg.TOPIC_NAME.getOptionName()); //user topic
- args.add("FALCON." + Arg.ENTITY_NAME.getOptionValue(cmd));
- //note, the user broker impl class arg name to MessageProducer is brokerImplClass
- args.add("-" + Arg.BRKR_IMPL_CLASS.getOptionName());
- args.add(Arg.USER_BRKR_IMPL_CLASS.getOptionValue(cmd));
- args.add("-" + Arg.BRKR_URL.getOptionName());
- args.add(Arg.USER_BRKR_URL.getOptionValue(cmd));
- addArg(args, cmd, Arg.BRKR_TTL);
- addArg(args, cmd, Arg.FEED_NAMES);
- addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
- addArg(args, cmd, Arg.LOG_FILE);
-
- MessageProducer.main(args.toArray(new String[0]));
+ private void invokeUserMessageProducer(WorkflowExecutionContext context) throws Exception {
+ JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+ .type(JMSMessageProducer.MessageType.USER)
+ .build();
+ jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}
- private void invokeFalconMessageProducer(CommandLine cmd) throws Exception {
- List<String> args = new ArrayList<String>();
- addArg(args, cmd, Arg.CLUSTER);
- addArg(args, cmd, Arg.ENTITY_TYPE);
- addArg(args, cmd, Arg.ENTITY_NAME);
- addArg(args, cmd, Arg.NOMINAL_TIME);
- addArg(args, cmd, Arg.OPERATION);
- addArg(args, cmd, Arg.WORKFLOW_ID);
- addArg(args, cmd, Arg.RUN_ID);
- addArg(args, cmd, Arg.STATUS);
- addArg(args, cmd, Arg.TIMESTAMP);
- //special args Falcon JMS message producer
- args.add("-" + Arg.TOPIC_NAME.getOptionName());
- args.add("FALCON.ENTITY.TOPIC");
- args.add("-" + Arg.BRKR_IMPL_CLASS.getOptionName());
- args.add(Arg.BRKR_IMPL_CLASS.getOptionValue(cmd));
- args.add("-" + Arg.BRKR_URL.getOptionName());
- args.add(Arg.BRKR_URL.getOptionValue(cmd));
- addArg(args, cmd, Arg.BRKR_TTL);
- addArg(args, cmd, Arg.FEED_NAMES);
- addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
- addArg(args, cmd, Arg.LOG_FILE);
- addArg(args, cmd, Arg.WORKFLOW_USER);
- addArg(args, cmd, Arg.LOG_DIR);
-
- MessageProducer.main(args.toArray(new String[0]));
+ private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
+ JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+ .type(JMSMessageProducer.MessageType.FALCON)
+ .build();
+ jmsMessageProducer.sendMessage();
}
- private void invokeLogProducer(CommandLine cmd) throws Exception {
+ private void invokeLogProducer(WorkflowExecutionContext context) {
// todo: need to move this out to Falcon in-process
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Unable to move logs as security is enabled.");
return;
}
- List<String> args = new ArrayList<String>();
- addArg(args, cmd, Arg.WF_ENGINE_URL);
- addArg(args, cmd, Arg.ENTITY_TYPE);
- addArg(args, cmd, Arg.USER_SUBFLOW_ID);
- addArg(args, cmd, Arg.USER_WORKFLOW_ENGINE);
- addArg(args, cmd, Arg.RUN_ID);
- addArg(args, cmd, Arg.LOG_DIR);
- addArg(args, cmd, Arg.STATUS);
-
- LogMover.main(args.toArray(new String[0]));
- }
-
- private void recordLineageMetadata(CommandLine cmd) throws Exception {
- List<String> args = new ArrayList<String>();
-
- for (LineageArgs arg : LineageArgs.values()) {
- if (StringUtils.isNotEmpty(arg.getOptionValue(cmd))) {
- args.add("-" + arg.getOptionName());
- args.add(arg.getOptionValue(cmd));
- }
- }
-
- LineageRecorder.main(args.toArray(new String[args.size()]));
- }
-
- private void addArg(List<String> args, CommandLine cmd, Arg arg) {
- if (StringUtils.isNotEmpty(arg.getOptionValue(cmd))) {
- args.add("-" + arg.getOptionName());
- args.add(arg.getOptionValue(cmd));
- }
- }
-
- private static CommandLine getCommand(String[] arguments)
- throws ParseException {
-
- Options options = new Options();
- addOption(options, Arg.CLUSTER);
- addOption(options, Arg.ENTITY_TYPE);
- addOption(options, Arg.ENTITY_NAME);
- addOption(options, Arg.NOMINAL_TIME);
- addOption(options, Arg.OPERATION);
- addOption(options, Arg.WORKFLOW_ID);
- addOption(options, Arg.RUN_ID);
- addOption(options, Arg.STATUS);
- addOption(options, Arg.TIMESTAMP);
- addOption(options, Arg.BRKR_IMPL_CLASS);
- addOption(options, Arg.BRKR_URL);
- addOption(options, Arg.USER_BRKR_IMPL_CLASS);
- addOption(options, Arg.USER_BRKR_URL);
- addOption(options, Arg.BRKR_TTL);
- addOption(options, Arg.FEED_NAMES);
- addOption(options, Arg.FEED_INSTANCE_PATHS);
- addOption(options, Arg.LOG_FILE);
- addOption(options, Arg.WF_ENGINE_URL);
- addOption(options, Arg.USER_SUBFLOW_ID);
- addOption(options, Arg.USER_WORKFLOW_NAME, false);
- addOption(options, Arg.USER_WORKFLOW_VERSION, false);
- addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
- addOption(options, Arg.LOG_DIR);
- addOption(options, Arg.WORKFLOW_USER);
- addOption(options, Arg.INPUT_FEED_NAMES, false);
- addOption(options, Arg.INPUT_FEED_PATHS, false);
-
- return new GnuParser().parse(options, arguments);
- }
-
- private static void addOption(Options options, Arg arg) {
- addOption(options, arg, true);
- }
-
- private static void addOption(Options options, Arg arg, boolean isRequired) {
- Option option = arg.getOption();
- option.setRequired(isRequired);
- options.addOption(option);
+ new JobLogMover().run(context);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 561303d..feddfdd 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -20,7 +20,7 @@ package org.apache.falcon.oozie.workflow;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.falcon.workflow.FalconPostProcessing;
-import org.apache.falcon.workflow.FalconPostProcessing.Arg;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -45,34 +45,34 @@ public class FalconPostProcessingTest {
@BeforeClass
public void setup() throws Exception {
args = new String[]{
- "-" + Arg.ENTITY_NAME.getOptionName(), ENTITY_NAME,
- "-" + Arg.FEED_NAMES.getOptionName(), "out-click-logs,out-raw-logs",
- "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(),
+ "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
+ "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
"/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
- "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00",
- "-" + Arg.WORKFLOW_USER.getOptionName(), "falcon",
- "-" + Arg.RUN_ID.getOptionName(), "1",
- "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00",
- "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00",
- "-" + Arg.BRKR_URL.getOptionName(), BROKER_URL,
- "-" + Arg.BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
- "-" + Arg.USER_BRKR_URL.getOptionName(), BROKER_URL,
- "-" + Arg.USER_BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
- "-" + Arg.ENTITY_TYPE.getOptionName(), ("process"),
- "-" + Arg.OPERATION.getOptionName(), ("GENERATE"),
- "-" + Arg.LOG_FILE.getOptionName(), ("/logFile"),
- "-" + Arg.STATUS.getOptionName(), ("SUCCEEDED"),
- "-" + Arg.BRKR_TTL.getOptionName(), "10",
- "-" + Arg.CLUSTER.getOptionName(), "corp",
- "-" + Arg.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie/",
- "-" + Arg.LOG_DIR.getOptionName(), "target/log",
- "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
- "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
- "-" + Arg.INPUT_FEED_NAMES.getOptionName(), "in-click-logs,in-raw-logs",
- "-" + Arg.INPUT_FEED_PATHS.getOptionName(),
+ "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+ "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+ "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+ "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+ "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+ "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+ "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+ "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+ "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+ "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(), "/logFile",
+ "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+ "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+ "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie/",
+ "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "target/log",
+ "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id" + "test",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), "oozie",
+ "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "in-click-logs,in-raw-logs",
+ "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
"/in-click-logs/10/05/05/00/20,/in-raw-logs/10/05/05/00/20",
- "-" + Arg.USER_WORKFLOW_NAME.getOptionName(), "test-workflow",
- "-" + Arg.USER_WORKFLOW_VERSION.getOptionName(), "1.0.0",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), "test-workflow",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), "1.0.0",
};
broker = new BrokerService();
@@ -95,8 +95,8 @@ public class FalconPostProcessingTest {
@Override
public void run() {
try {
- consumer(BROKER_URL, "FALCON." + ENTITY_NAME);
- consumer(BROKER_URL, FALCON_TOPIC_NAME);
+ consumer(BROKER_URL, "FALCON." + ENTITY_NAME); // user message
+ consumer(BROKER_URL, FALCON_TOPIC_NAME); // falcon message
} catch (AssertionError e) {
error = e;
} catch (JMSException ignore) {
@@ -131,13 +131,13 @@ public class FalconPostProcessingTest {
assertMessage(m);
if (topic.equals(FALCON_TOPIC_NAME)) {
- Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()),
"out-click-logs,out-raw-logs");
- Assert.assertEquals(m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
"/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20");
} else {
- Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()), "out-click-logs");
- Assert.assertEquals(m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "out-click-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
"/out-click-logs/10/05/05/00/20");
}
@@ -145,15 +145,13 @@ public class FalconPostProcessingTest {
}
private void assertMessage(MapMessage m) throws JMSException {
- Assert.assertEquals(m.getString(Arg.ENTITY_NAME.getOptionName()), "agg-coord");
- Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()), "workflow-01-00");
- String workflowUser = m.getString(Arg.WORKFLOW_USER.getOptionName());
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), "agg-coord");
+ String workflowUser = m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName());
if (workflowUser != null) { // in case of user message, its NULL
Assert.assertEquals(workflowUser, "falcon");
}
- Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1");
- Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()), "2011-01-01T01:00Z");
- Assert.assertEquals(m.getString(Arg.TIMESTAMP.getOptionName()), "2012-01-01T01:00Z");
- Assert.assertEquals(m.getString(Arg.STATUS.getOptionName()), "SUCCEEDED");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01T01:00Z");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01T01:00Z");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index 51def35..89132d6 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -117,7 +117,7 @@ public class PigProcessIT {
.get(InstancesResult.class);
Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
- // verify LogMover
+ // verify JobLogMover
Path oozieLogPath = OozieTestUtils.getOozieLogPath(context.getCluster().getCluster(), jobInfo);
Assert.assertTrue(context.getCluster().getFileSystem().exists(oozieLogPath));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
index de8bc33..e67fe2a 100644
--- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
@@ -22,8 +22,9 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.logging.LogMover;
+import org.apache.falcon.logging.JobLogMover;
import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.engine.OozieClientFactory;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
@@ -176,13 +177,17 @@ public final class OozieTestUtils {
public static Path getOozieLogPath(Cluster cluster, WorkflowJob jobInfo) throws Exception {
Path stagingPath = EntityUtil.getLogPath(cluster, cluster);
final Path logPath = new Path(ClusterHelper.getStorageUrl(cluster), stagingPath);
- LogMover.main(new String[] {
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(new String[] {
"-workflowEngineUrl", ClusterHelper.getOozieUrl(cluster),
- "-subflowId", jobInfo.getId(), "-runId", "1",
+ "-subflowId", jobInfo.getId(),
+ "-runId", "1",
"-logDir", logPath.toString() + "/job-2012-04-21-00-00",
- "-status", "SUCCEEDED", "-entityType", "process",
+ "-status", "SUCCEEDED",
+ "-entityType", "process",
"-userWorkflowEngine", "pig",
- });
+ }, WorkflowExecutionContext.Type.POST_PROCESSING);
+
+ new JobLogMover().run(context);
return new Path(logPath, "job-2012-04-21-00-00/001/oozie.log");
}