You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:39 UTC
[6/8] git commit: FALCON-488 - Introduce Workflow Context in Lineage
Service. Contributed by Venkatesh Seetharam
FALCON-488 - Introduce Workflow Context in Lineage Service. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a0994437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a0994437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a0994437
Branch: refs/heads/master
Commit: a099443758045d625da5d2bb382898e5e13009ab
Parents: b9781d1
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 13:01:37 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 13:01:37 2014 -0700
----------------------------------------------------------------------
.../EntityRelationshipGraphBuilder.java | 6 +-
.../InstanceRelationshipGraphBuilder.java | 100 ++++++------
.../org/apache/falcon/metadata/LineageArgs.java | 82 ----------
.../apache/falcon/metadata/LineageRecorder.java | 154 -------------------
.../falcon/metadata/MetadataMappingService.java | 59 +++----
.../metadata/RelationshipGraphBuilder.java | 10 --
.../falcon/metadata/LineageRecorderTest.java | 103 -------------
.../metadata/MetadataMappingServiceTest.java | 71 +++++----
.../metadata/LineageMetadataResourceTest.java | 76 +++++----
9 files changed, 174 insertions(+), 487 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 2f46ff4..364d8f1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,10 +167,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) {
- processVertex.setProperty(LineageArgs.USER_WORKFLOW_NAME.getOptionName(),
+ processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion());
- processVertex.setProperty(LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), workflow.getEngine().value());
+ processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
+ workflow.getEngine().value());
}
public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 7ee185f..34857a3 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -34,9 +34,10 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import java.net.URISyntaxException;
-import java.util.Map;
/**
* Instance Metadata relationship mapping helper.
@@ -45,18 +46,17 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
- private static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
// process workflow properties from message
- private static final String[] INSTANCE_WORKFLOW_PROPERTIES = {
- LineageArgs.USER_WORKFLOW_NAME.getOptionName(),
- LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(),
- LineageArgs.WORKFLOW_ID.getOptionName(),
- LineageArgs.RUN_ID.getOptionName(),
- LineageArgs.STATUS.getOptionName(),
- LineageArgs.WF_ENGINE_URL.getOptionName(),
- LineageArgs.USER_SUBFLOW_ID.getOptionName(),
+ private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
+ WorkflowExecutionArgs.USER_WORKFLOW_NAME,
+ WorkflowExecutionArgs.USER_WORKFLOW_ENGINE,
+ WorkflowExecutionArgs.WORKFLOW_ID,
+ WorkflowExecutionArgs.RUN_ID,
+ WorkflowExecutionArgs.STATUS,
+ WorkflowExecutionArgs.WF_ENGINE_URL,
+ WorkflowExecutionArgs.USER_SUBFLOW_ID,
};
@@ -64,49 +64,51 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
super(graph, preserveHistory);
}
- public Vertex addProcessInstance(Map<String, String> lineageMetadata) throws FalconException {
- String entityName = lineageMetadata.get(LineageArgs.ENTITY_NAME.getOptionName());
- String processInstanceName = getProcessInstanceName(entityName,
- lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
- LOG.info("Adding process instance: {}", processInstanceName);
+ public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
+ String processInstanceName = getProcessInstanceName(context);
+ LOG.info("Adding process instance: " + processInstanceName);
- String timestamp = getTimestamp(lineageMetadata);
- Vertex processInstance = addVertex(processInstanceName, RelationshipType.PROCESS_INSTANCE, timestamp);
- addWorkflowInstanceProperties(processInstance, lineageMetadata);
+ Vertex processInstance = addVertex(processInstanceName,
+ RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
+ addWorkflowInstanceProperties(processInstance, context);
- addInstanceToEntity(processInstance, entityName,
+ addInstanceToEntity(processInstance, context.getEntityName(),
RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
- addInstanceToEntity(processInstance, lineageMetadata.get(LineageArgs.CLUSTER.getOptionName()),
+ addInstanceToEntity(processInstance, context.getClusterName(),
RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
- addInstanceToEntity(processInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+ addInstanceToEntity(processInstance, context.getWorkflowUser(),
RelationshipType.USER, RelationshipLabel.USER);
if (isPreserveHistory()) {
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName());
addDataClassification(process.getTags(), processInstance);
}
return processInstance;
}
- private String getTimestamp(Map<String, String> lineageMetadata) {
- String timestamp = lineageMetadata.get(LineageArgs.TIMESTAMP.getOptionName());
- return SchemaHelper.formatDateUTCToISO8601(timestamp, PROCESS_INSTANCE_FORMAT);
+ public String getProcessInstanceName(WorkflowExecutionContext context) {
+ return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
}
public void addWorkflowInstanceProperties(Vertex processInstance,
- Map<String, String> lineageMetadata) {
- for (String instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) {
- addProperty(processInstance, lineageMetadata, instanceWorkflowProperty);
+ WorkflowExecutionContext context) {
+ for (WorkflowExecutionArgs instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) {
+ addProperty(processInstance, context, instanceWorkflowProperty);
}
+
processInstance.setProperty(RelationshipProperty.VERSION.getName(),
- lineageMetadata.get(LineageArgs.USER_WORKFLOW_VERSION.getOptionName()));
+ context.getUserWorkflowVersion());
}
- public String getProcessInstanceName(String entityName,
- String nominalTime) {
- return entityName + "/"
- + SchemaHelper.formatDateUTCToISO8601(nominalTime, PROCESS_INSTANCE_FORMAT);
+ private void addProperty(Vertex vertex, WorkflowExecutionContext context,
+ WorkflowExecutionArgs optionName) {
+ String value = context.getValue(optionName);
+ if (value == null || value.length() == 0) {
+ return;
+ }
+
+ vertex.setProperty(optionName.getName(), value);
}
public void addInstanceToEntity(Vertex instanceVertex, String entityName,
@@ -122,37 +124,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
addEdge(instanceVertex, entityVertex, edgeLabel.getName());
}
- public void addOutputFeedInstances(Map<String, String> lineageMetadata,
+ public void addOutputFeedInstances(WorkflowExecutionContext context,
Vertex processInstance) throws FalconException {
- String outputFeedNamesArg = lineageMetadata.get(LineageArgs.FEED_NAMES.getOptionName());
+ String outputFeedNamesArg = context.getOutputFeedNames();
if ("NONE".equals(outputFeedNamesArg)) {
return; // there are no output feeds for this process
}
- String[] outputFeedNames = outputFeedNamesArg.split(",");
- String[] outputFeedInstancePaths =
- lineageMetadata.get(LineageArgs.FEED_INSTANCE_PATHS.getOptionName()).split(",");
+ String[] outputFeedNames = context.getOutputFeedNamesList();
+ String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
for (int index = 0; index < outputFeedNames.length; index++) {
String feedName = outputFeedNames[index];
String feedInstanceDataPath = outputFeedInstancePaths[index];
addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE,
- lineageMetadata, feedName, feedInstanceDataPath);
+ context, feedName, feedInstanceDataPath);
}
}
- public void addInputFeedInstances(Map<String, String> lineageMetadata,
+ public void addInputFeedInstances(WorkflowExecutionContext context,
Vertex processInstance) throws FalconException {
- String inputFeedNamesArg = lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName());
+ String inputFeedNamesArg = context.getInputFeedNames();
if ("NONE".equals(inputFeedNamesArg)) {
return; // there are no input feeds for this process
}
- String[] inputFeedNames =
- lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName()).split("#");
- // Each input feed is separated by #
- String[] inputFeedInstancePaths =
- lineageMetadata.get(LineageArgs.INPUT_FEED_PATHS.getOptionName()).split("#");
+ String[] inputFeedNames = context.getInputFeedNamesList();
+ String[] inputFeedInstancePaths = context.getInputFeedInstancePathsList();
for (int index = 0; index < inputFeedNames.length; index++) {
String inputFeedName = inputFeedNames[index];
@@ -162,21 +160,21 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
for (String feedInstanceDataPath : feedInstancePaths) {
addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE,
- lineageMetadata, inputFeedName, feedInstanceDataPath);
+ context, inputFeedName, feedInstanceDataPath);
}
}
}
private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
- Map<String, String> lineageMetadata, String feedName,
+ WorkflowExecutionContext context, String feedName,
String feedInstanceDataPath) throws FalconException {
- String clusterName = lineageMetadata.get(LineageArgs.CLUSTER.getOptionName());
+ String clusterName = context.getClusterName();
LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ feedInstanceDataPath + ", in cluster: " + clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
LOG.info("Adding feed instance: " + feedInstanceName);
Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
- getTimestamp(lineageMetadata));
+ context.getTimeStampAsISO8601());
addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
@@ -184,7 +182,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
addInstanceToEntity(feedInstance, clusterName,
RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
- addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+ addInstanceToEntity(feedInstance, context.getWorkflowUser(),
RelationshipType.USER, RelationshipLabel.USER);
if (isPreserveHistory()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
deleted file mode 100644
index e91f7ab..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-
-/**
- * Args for data Lineage.
- */
-@Deprecated // delete this class
-public enum LineageArgs {
- // process instance
- NOMINAL_TIME("nominalTime", "instance time"),
- ENTITY_TYPE("entityType", "type of the entity"),
- ENTITY_NAME("entityName", "name of the entity"),
- TIMESTAMP("timeStamp", "current timestamp"),
-
- // where
- CLUSTER("cluster", "name of the current cluster"),
- OPERATION("operation", "operation like generate, delete, replicate"),
-
- // who
- WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
-
- // workflow details
- WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
- RUN_ID("runId", "current run-id of the instance"),
- STATUS("status", "status of the user workflow isnstance"),
- WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
- USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
- USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
- USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"),
- USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"),
-
- // what inputs
- INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs"),
- INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths"),
-
- // what outputs
- FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
- FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
-
- // lineage data recorded
- LOG_DIR("logDir", "log dir where lineage can be recorded");
-
- private String name;
- private String description;
-
- LineageArgs(String name, String description) {
- this.name = name;
- this.description = description;
- }
-
- public Option getOption() {
- return new Option(this.name, true, this.description);
- }
-
- public String getOptionName() {
- return this.name;
- }
-
- public String getOptionValue(CommandLine cmd) {
- return cmd.getOptionValue(this.name);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
deleted file mode 100644
index 5ab1bf0..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility called in the post process of oozie workflow to record lineage information.
- */
-@Deprecated // delete this class
-public class LineageRecorder extends Configured implements Tool {
-
- private static final Logger LOG = LoggerFactory.getLogger(LineageRecorder.class);
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new LineageRecorder(), args);
- }
-
- @Override
- public int run(String[] arguments) throws Exception {
- CommandLine command = getCommand(arguments);
-
- LOG.info("Parsing lineage metadata from: {}", command);
- Map<String, String> lineageMetadata = getLineageMetadata(command);
- LOG.info("Lineage Metadata: {}", lineageMetadata);
-
- String lineageFile = getFilePath(command.getOptionValue(LineageArgs.LOG_DIR.getOptionName()),
- command.getOptionValue(LineageArgs.ENTITY_NAME.getOptionName())
- );
-
- LOG.info("Persisting lineage metadata to: {}", lineageFile);
- persistLineageMetadata(lineageMetadata, lineageFile);
-
- return 0;
- }
-
- protected static CommandLine getCommand(String[] arguments) throws ParseException {
-
- Options options = new Options();
-
- for (LineageArgs arg : LineageArgs.values()) {
- addOption(options, arg);
- }
-
- return new GnuParser().parse(options, arguments);
- }
-
- private static void addOption(Options options, LineageArgs arg) {
- addOption(options, arg, true);
- }
-
- private static void addOption(Options options, LineageArgs arg, boolean isRequired) {
- Option option = arg.getOption();
- option.setRequired(isRequired);
- options.addOption(option);
- }
-
- protected Map<String, String> getLineageMetadata(CommandLine command) {
- Map<String, String> lineageMetadata = new HashMap<String, String>();
-
- for (LineageArgs arg : LineageArgs.values()) {
- lineageMetadata.put(arg.getOptionName(), arg.getOptionValue(command));
- }
-
- return lineageMetadata;
- }
-
- public static String getFilePath(String logDir, String entityName) {
- return logDir + entityName + "-lineage.json";
- }
-
- /**
- * this method is invoked from with in the workflow.
- *
- * @param lineageMetadata metadata to persist
- * @param lineageFile file to serialize the metadata
- * @throws IOException
- * @throws FalconException
- */
- protected void persistLineageMetadata(Map<String, String> lineageMetadata,
- String lineageFile) throws IOException, FalconException {
- OutputStream out = null;
- Path file = new Path(lineageFile);
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
- out = fs.create(file);
-
- // making sure falcon can read this file
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
- fs.setPermission(file, permission);
-
- out.write(JSONValue.toJSONString(lineageMetadata).getBytes());
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException ignore) {
- // ignore
- }
- }
- }
- }
-
- public static Map<String, String> parseLineageMetadata(String lineageFile) throws FalconException {
- try {
- Path lineageDataPath = new Path(lineageFile); // file has 777 permissions
- FileSystem fs = HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
-
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
- return (Map<String, String>) JSONValue.parse(in);
- } catch (IOException e) {
- throw new FalconException("Error opening lineage file", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 5df4611..80e96ba 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -36,9 +36,13 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
+import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
import java.util.Map;
import java.util.Properties;
@@ -47,7 +51,8 @@ import java.util.Set;
/**
* Metadata relationship mapping service. Maps relationships into a graph database.
*/
-public class MetadataMappingService implements FalconService, ConfigurationChangeListener {
+public class MetadataMappingService
+ implements FalconService, ConfigurationChangeListener, WorkflowExecutionListener {
private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class);
@@ -92,6 +97,8 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
instanceGraphBuilder = new InstanceRelationshipGraphBuilder(graph, preserveHistory);
ConfigurationStore.get().registerListener(this);
+ Services.get().<WorkflowJobEndNotificationService>getService(
+ WorkflowJobEndNotificationService.NAME).registerListener(this);
}
protected Graph initializeGraphDB() {
@@ -179,6 +186,9 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
@Override
public void destroy() throws FalconException {
+ Services.get().<WorkflowJobEndNotificationService>getService(
+ WorkflowJobEndNotificationService.NAME).unregisterListener(this);
+
LOG.info("Shutting down graph db");
graph.shutdown();
}
@@ -244,54 +254,47 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
// are already added to the graph
}
- /**
- * Entity operations.
- */
- public enum EntityOperations {
- GENERATE, REPLICATE, DELETE
- }
-
- public void onSuccessfulWorkflowCompletion(String entityName, String operation,
- String logDir) throws FalconException {
- String lineageFile = LineageRecorder.getFilePath(logDir, entityName);
-
- LOG.info("Parsing lineage metadata from: {}", lineageFile);
- Map<String, String> lineageMetadata = LineageRecorder.parseLineageMetadata(lineageFile);
-
- EntityOperations entityOperation = EntityOperations.valueOf(operation);
+ @Override
+ public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
- LOG.info("Adding lineage for entity: {}, operation: {}", entityName, operation);
+ LOG.info("Adding lineage for context {}", context);
switch (entityOperation) {
case GENERATE:
- onProcessInstanceAdded(lineageMetadata);
+ onProcessInstanceExecuted(context);
getTransactionalGraph().commit();
break;
case REPLICATE:
- onFeedInstanceReplicated(lineageMetadata);
+ onFeedInstanceReplicated(context);
break;
case DELETE:
- onFeedInstanceEvicted(lineageMetadata);
+ onFeedInstanceEvicted(context);
break;
default:
}
}
- private void onProcessInstanceAdded(Map<String, String> lineageMetadata) throws FalconException {
- Vertex processInstance = instanceGraphBuilder.addProcessInstance(lineageMetadata);
- instanceGraphBuilder.addOutputFeedInstances(lineageMetadata, processInstance);
- instanceGraphBuilder.addInputFeedInstances(lineageMetadata, processInstance);
+ @Override
+ public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ // do nothing since lineage is only recorded for successful workflow
+ }
+
+ private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
+ Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
+ instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
+ instanceGraphBuilder.addInputFeedInstances(context, processInstance);
}
- private void onFeedInstanceReplicated(Map<String, String> lineageMetadata) {
- LOG.info("Adding replicated feed instance: {}", lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
+ private void onFeedInstanceReplicated(WorkflowExecutionContext context) {
+ LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
// todo - tbd
}
- private void onFeedInstanceEvicted(Map<String, String> lineageMetadata) {
- LOG.info("Adding evicted feed instance: {}", lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
+ private void onFeedInstanceEvicted(WorkflowExecutionContext context) {
+ LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
// todo - tbd
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 9ee0ea6..640df41 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Iterator;
-import java.util.Map;
/**
* Base class for Metadata relationship mapping helper.
@@ -192,13 +191,4 @@ public abstract class RelationshipGraphBuilder {
protected String getCurrentTimeStamp() {
return SchemaHelper.formatDateUTC(new Date());
}
-
- protected void addProperty(Vertex vertex, Map<String, String> lineageMetadata, String optionName) {
- String value = lineageMetadata.get(optionName);
- if (value == null || value.length() == 0) {
- return;
- }
-
- vertex.setProperty(optionName, value);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java b/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
deleted file mode 100644
index d686ab6..0000000
--- a/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-/**
- * Unit test for lineage recorder.
- */
-public class LineageRecorderTest {
-
- private static final String LOGS_DIR = "target/log";
- private static final String NOMINAL_TIME = "2014-01-01-01-00";
- private static final String ENTITY_NAME = "test-process-entity";
-
- private String[] args;
-
- @BeforeMethod
- public void setUp() throws Exception {
- args = new String[]{
- "-" + LineageArgs.ENTITY_NAME.getOptionName(), ENTITY_NAME,
- "-" + LineageArgs.FEED_NAMES.getOptionName(), "out-click-logs,out-raw-logs",
- "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(),
- "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
- "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
- "-" + LineageArgs.WORKFLOW_USER.getOptionName(), "falcon",
- "-" + LineageArgs.RUN_ID.getOptionName(), "1",
- "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
- "-" + LineageArgs.TIMESTAMP.getOptionName(), "2014-01-01-01-00",
- "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
- "-" + LineageArgs.OPERATION.getOptionName(), ("GENERATE"),
- "-" + LineageArgs.STATUS.getOptionName(), ("SUCCEEDED"),
- "-" + LineageArgs.CLUSTER.getOptionName(), "corp",
- "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie/",
- "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
- "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
- "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
- "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), "in-click-logs,in-raw-logs",
- "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(),
- "/in-click-logs/10/05/05/00/20,/in-raw-logs/10/05/05/00/20",
- "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), "test-workflow",
- "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), "1.0.0",
- };
- }
-
- @AfterMethod
- public void tearDown() throws Exception {
-
- }
-
- @Test
- public void testMain() throws Exception {
- LineageRecorder lineageRecorder = new LineageRecorder();
- CommandLine command = LineageRecorder.getCommand(args);
- Map<String, String> lineageMetadata = lineageRecorder.getLineageMetadata(command);
-
- LineageRecorder.main(args);
-
- String lineageFile = LineageRecorder.getFilePath(LOGS_DIR, ENTITY_NAME);
- Path lineageDataPath = new Path(lineageFile);
- FileSystem fs = lineageDataPath.getFileSystem(new Configuration());
- Assert.assertTrue(fs.exists(lineageDataPath));
-
- Map<String, String> recordedLineageMetadata =
- LineageRecorder.parseLineageMetadata(lineageFile);
-
- for (Map.Entry<String, String> entry : lineageMetadata.entrySet()) {
- Assert.assertEquals(lineageMetadata.get(entry.getKey()),
- recordedLineageMetadata.get(entry.getKey()));
- }
- }
-
- @Test
- public void testGetFilePath() throws Exception {
- String path = LOGS_DIR + ENTITY_NAME + "-lineage.json";
- Assert.assertEquals(path, LineageRecorder.getFilePath(LOGS_DIR, ENTITY_NAME));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 52bb7ae..c94de10 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -36,7 +36,11 @@ import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -75,6 +79,8 @@ public class MetadataMappingServiceTest {
public static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+ public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
+
private ConfigurationStore configStore;
private MetadataMappingService service;
@@ -91,6 +97,7 @@ public class MetadataMappingServiceTest {
configStore = ConfigurationStore.get();
+ Services.get().register(new WorkflowJobEndNotificationService());
StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
service = new MetadataMappingService();
service.init();
@@ -218,9 +225,9 @@ public class MetadataMappingServiceTest {
service.destroy();
service.init();
- LineageRecorder.main(getTestMessageArgs());
-
- service.onSuccessfulWorkflowCompletion(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
debug(service.getGraph());
GraphUtils.dump(service.getGraph());
@@ -592,31 +599,39 @@ public class MetadataMappingServiceTest {
private static String[] getTestMessageArgs() {
return new String[]{
- "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
- "-" + LineageArgs.TIMESTAMP.getOptionName(), NOMINAL_TIME,
-
- "-" + LineageArgs.ENTITY_NAME.getOptionName(), PROCESS_ENTITY_NAME,
- "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
- "-" + LineageArgs.CLUSTER.getOptionName(), CLUSTER_ENTITY_NAME,
- "-" + LineageArgs.OPERATION.getOptionName(), OPERATION,
-
- "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), INPUT_FEED_NAMES,
- "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(), INPUT_INSTANCE_PATHS,
-
- "-" + LineageArgs.FEED_NAMES.getOptionName(), OUTPUT_FEED_NAMES,
- "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(), OUTPUT_INSTANCE_PATHS,
-
- "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
- "-" + LineageArgs.WORKFLOW_USER.getOptionName(), FALCON_USER,
- "-" + LineageArgs.RUN_ID.getOptionName(), "1",
- "-" + LineageArgs.STATUS.getOptionName(), "SUCCEEDED",
- "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie",
- "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id",
- "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), WORKFLOW_NAME,
- "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), WORKFLOW_VERSION,
- "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), EngineType.PIG.name(),
-
- "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+ "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+
+ "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+
+ "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+
+ "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+ "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+ "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+ "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+ "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+ "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+ "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+
+ "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
+ "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
+ "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+ "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
};
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index d664782..a82acda 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -34,14 +34,15 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.metadata.LineageArgs;
-import org.apache.falcon.metadata.LineageRecorder;
import org.apache.falcon.metadata.MetadataMappingService;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.json.simple.JSONValue;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -95,10 +96,14 @@ public class LineageMetadataResourceTest {
configStore = ConfigurationStore.get();
+ Services.get().register(new WorkflowJobEndNotificationService());
+ Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.NAME));
+
StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
service = new MetadataMappingService();
service.init();
Services.get().register(service);
+ Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
addClusterEntity();
addFeedEntity();
@@ -110,8 +115,11 @@ public class LineageMetadataResourceTest {
public void tearDown() throws Exception {
cleanupGraphStore(service.getGraph());
cleanupConfigurationStore(configStore);
+
service.destroy();
+
StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+ Services.get().reset();
}
@Test (expectedExceptions = WebApplicationException.class)
@@ -397,6 +405,7 @@ public class LineageMetadataResourceTest {
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
Assert.assertEquals(response.getEntity().toString(), "Lineage Metadata Service is not enabled.");
} finally {
+ Services.get().register(new WorkflowJobEndNotificationService());
Services.get().register(service);
}
}
@@ -517,37 +526,46 @@ public class LineageMetadataResourceTest {
}
public void addInstance() throws Exception {
- LineageRecorder.main(getTestMessageArgs());
- service.onSuccessfulWorkflowCompletion(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
}
private static String[] getTestMessageArgs() {
return new String[]{
- "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
- "-" + LineageArgs.TIMESTAMP.getOptionName(), NOMINAL_TIME,
-
- "-" + LineageArgs.ENTITY_NAME.getOptionName(), PROCESS_ENTITY_NAME,
- "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
- "-" + LineageArgs.CLUSTER.getOptionName(), CLUSTER_ENTITY_NAME,
- "-" + LineageArgs.OPERATION.getOptionName(), OPERATION,
-
- "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), INPUT_FEED_NAMES,
- "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(), INPUT_INSTANCE_PATHS,
-
- "-" + LineageArgs.FEED_NAMES.getOptionName(), OUTPUT_FEED_NAMES,
- "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(), OUTPUT_INSTANCE_PATHS,
-
- "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
- "-" + LineageArgs.WORKFLOW_USER.getOptionName(), FALCON_USER,
- "-" + LineageArgs.RUN_ID.getOptionName(), "1",
- "-" + LineageArgs.STATUS.getOptionName(), "SUCCEEDED",
- "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie",
- "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id",
- "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), WORKFLOW_NAME,
- "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), WORKFLOW_VERSION,
- "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), EngineType.PIG.name(),
-
- "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+ "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+
+ "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+
+ "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+
+ "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+ "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+ "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+ "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+ "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+ "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+ "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+
+ "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
+ "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
+ "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+ "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
};
}