You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/03/05 02:18:36 UTC
falcon git commit: yes
Repository: falcon
Updated Branches:
refs/heads/master 6c787783b -> f40926669
yes
Add running, succeeded, failed, killed and suspended instances to titan DB based on JMS notifications on workflow jobs. Instance-entity edge properties (e.g. nominal time, status) are also added for vertex-centric indexing.
Author: yzheng-hortonworks <yz...@hortonworks.com>
Reviewers: Balu Vellanki <bv...@hortonworks.com>, Venkatesan Ramachandran <Me...@gmail.com>
Closes #47 from yzheng-hortonworks/FALCON-1111 and squashes the following commits:
f2c9faf [yzheng-hortonworks] review by Balu
ff9556b [yzheng-hortonworks] review from Venky
b76f642 [yzheng-hortonworks] FALCON-1111 Instance update on titan DB based on JMS notifications on workflow jobs
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4092666
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4092666
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4092666
Branch: refs/heads/master
Commit: f40926669a68d7ad3d548d558e490df6d53a3e0b
Parents: 6c78778
Author: yzheng-hortonworks <yz...@hortonworks.com>
Authored: Fri Mar 4 17:18:38 2016 -0800
Committer: bvellanki <bv...@hortonworks.com>
Committed: Fri Mar 4 17:18:38 2016 -0800
----------------------------------------------------------------------
.../InstanceRelationshipGraphBuilder.java | 89 ++++++++++----------
.../falcon/metadata/MetadataMappingService.java | 83 ++++++++++--------
.../metadata/RelationshipGraphBuilder.java | 9 +-
.../falcon/metadata/RelationshipProperty.java | 6 +-
common/src/main/resources/startup.properties | 4 +
src/conf/startup.properties | 4 +
6 files changed, 112 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index b709857..1d48f75 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import java.util.TimeZone;
/**
@@ -78,8 +80,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong());
addWorkflowInstanceProperties(processInstance, context);
+ Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
addInstanceToEntity(processInstance, context.getEntityName(),
- RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+ RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties);
addInstanceToEntity(processInstance, context.getClusterName(),
RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
addInstanceToEntity(processInstance, context.getWorkflowUser(),
@@ -153,15 +156,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
public void addInstanceToEntity(Vertex instanceVertex, String entityName,
RelationshipType entityType, RelationshipLabel edgeLabel,
- String timestamp) {
+ Map<RelationshipProperty, String> properties) {
Vertex entityVertex = findVertex(entityName, entityType);
LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
if (entityVertex == null) {
LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName);
throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
}
-
- addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
+ addEdge(instanceVertex, entityVertex, edgeLabel.getName(), properties);
}
public void addOutputFeedInstances(WorkflowExecutionContext context,
@@ -215,19 +217,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
feedInstanceDataPath, targetClusterName);
String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
- LOG.info("Vertex exists? name={}, type={}, v={}",
- feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
- if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
- LOG.info("{} instance vertex {} does not exist, add it",
- RelationshipType.FEED_INSTANCE, feedInstanceName);
- feedInstanceVertex = addFeedInstance(// add a new instance
- feedInstanceName, context, feedName, context.getSrcClusterName());
- }
+ Vertex feedInstanceVertex = addFeedInstance(
+ feedInstanceName, context, feedName, context.getSrcClusterName());
+ Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+ properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
- RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+ RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, properties);
addCounters(feedInstanceVertex, context);
}
@@ -250,20 +246,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
feedName, evictedFeedInstancePath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
evictedFeedInstancePath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName,
- RelationshipType.FEED_INSTANCE);
-
- LOG.info("Vertex exists? name={}, type={}, v={}",
- feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
- if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
- LOG.info("{} instance vertex {} does not exist, add it",
- RelationshipType.FEED_INSTANCE, feedInstanceName);
- feedInstanceVertex = addFeedInstance(// add a new instance
- feedInstanceName, context, feedName, clusterName);
- }
+ Vertex feedInstanceVertex = addFeedInstance(
+ feedInstanceName, context, feedName, clusterName);
+ Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+ properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
- RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601());
+ RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, properties);
}
}
@@ -280,20 +269,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
feedInstanceDataPath, sourceClusterName, datasourceName);
String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
- LOG.info("Vertex exists? name={}, type={}, v={}",
- feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
- if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
- LOG.info("{} instance vertex {} does not exist, add it",
- RelationshipType.FEED_INSTANCE, feedInstanceName);
- feedInstanceVertex = addFeedInstance(// add a new instance
- feedInstanceName, context, feedName, context.getSrcClusterName());
- }
+ Vertex feedInstanceVertex = addFeedInstance(
+ feedInstanceName, context, feedName, context.getSrcClusterName());
+
+ Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+ properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
- RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601());
+ RelationshipLabel.DATASOURCE_IMPORT_EDGE, properties);
addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
- RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601());
+ RelationshipLabel.FEED_CLUSTER_EDGE, properties);
}
public String getImportInstanceName(WorkflowExecutionContext context) {
@@ -308,18 +292,30 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
feedInstanceDataPath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+ Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName, false);
addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
}
private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
String feedName, String clusterName) throws FalconException {
+ return addFeedInstance(feedInstanceName, context, feedName, clusterName, true);
+ }
+
+ private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context, String feedName,
+ String clusterName, boolean hasEdgeProperties) throws FalconException {
LOG.info("Adding feed instance {}", feedInstanceName);
Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
context.getTimeStampAsLong());
-
- addInstanceToEntity(feedInstance, feedName,
- RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+ feedInstance.setProperty(RelationshipProperty.STATUS.getName(), context.getValue(WorkflowExecutionArgs.STATUS));
+
+ if (hasEdgeProperties) {
+ Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+ addInstanceToEntity(feedInstance, feedName,
+ RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties);
+ } else {
+ addInstanceToEntity(feedInstance, feedName,
+ RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+ }
addInstanceToEntity(feedInstance, clusterName,
RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
addInstanceToEntity(feedInstance, context.getWorkflowUser(),
@@ -334,6 +330,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
return feedInstance;
}
+ private Map<RelationshipProperty, String> edgePropertiesForIndexing(WorkflowExecutionContext context) {
+ Map<RelationshipProperty, String> properties = new HashMap<RelationshipProperty, String>();
+ properties.put(RelationshipProperty.NOMINAL_TIME, context.getNominalTimeAsISO8601());
+ properties.put(RelationshipProperty.STATUS, context.getValue(WorkflowExecutionArgs.STATUS));
+ return properties;
+ }
+
public static String getFeedInstanceName(String feedName, String clusterName,
String feedInstancePath,
String nominalTime) throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index cf2b651..7d22fd5 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -257,14 +257,42 @@ public class MetadataMappingService
}
@Override
+ public void onStart(final WorkflowExecutionContext context) throws FalconException {
+ LOG.info("onStart {}", context);
+ onInstanceExecutionUpdate(context);
+ }
+
+ @Override
public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
- LOG.info("Adding lineage for context {}", context);
+ LOG.info("onSuccess {}", context);
+ onInstanceExecutionUpdate(context);
+ }
+
+ @Override
+ public void onFailure(final WorkflowExecutionContext context) throws FalconException {
+ LOG.info("onFailure {}", context);
+ onInstanceExecutionUpdate(context);
+ }
+
+ @Override
+ public void onSuspend(final WorkflowExecutionContext context) throws FalconException {
+ LOG.info("onSuspend {}", context);
+ onInstanceExecutionUpdate(context);
+ }
+
+ @Override
+ public void onWait(final WorkflowExecutionContext context) throws FalconException {
+ LOG.info("onWait {}", context);
+ onInstanceExecutionUpdate(context);
+ }
+
+ private void onInstanceExecutionUpdate(final WorkflowExecutionContext context) throws FalconException {
try {
new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
.perform(new TransactionWork<Void>() {
@Override
public Void execute(TransactionalGraph transactionalGraph) throws Exception {
- onSuccessfulExecution(context);
+ updateInstanceStatus(context);
transactionalGraph.commit();
return null;
}
@@ -275,64 +303,49 @@ public class MetadataMappingService
}
}
- private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException {
+ private void updateInstanceStatus(final WorkflowExecutionContext context) throws FalconException {
+ if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
+ // TODO(yzheng): FALCON-1776 Instance update on titan DB based on JMS notifications on coordinator actions
+ return;
+ }
+
WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
switch (entityOperation) {
case GENERATE:
- onProcessInstanceExecuted(context);
+ updateProcessInstance(context);
break;
case REPLICATE:
- onFeedInstanceReplicated(context);
+ updateReplicatedFeedInstance(context);
break;
case DELETE:
- onFeedInstanceEvicted(context);
+ updateEvictedFeedInstance(context);
break;
case IMPORT:
- onFeedInstanceImported(context);
+ updateImportedFeedInstance(context);
break;
default:
throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
}
}
- @Override
- public void onFailure(WorkflowExecutionContext context) throws FalconException {
- // do nothing since lineage is only recorded for successful workflow
- }
-
- @Override
- public void onStart(WorkflowExecutionContext context) throws FalconException {
- // Do nothing
- }
-
- @Override
- public void onSuspend(WorkflowExecutionContext context) throws FalconException {
- // Do nothing
- }
-
- @Override
- public void onWait(WorkflowExecutionContext context) throws FalconException {
- // TBD
- }
-
-
- private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
+ private void updateProcessInstance(WorkflowExecutionContext context) throws FalconException {
+ LOG.info("Updating process instance: {}", context.getNominalTimeAsISO8601());
Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
instanceGraphBuilder.addInputFeedInstances(context, processInstance);
}
- private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException {
- LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
+ private void updateReplicatedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+ LOG.info("Updating replicated feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addReplicatedInstance(context);
}
- private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException {
- LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
+ private void updateEvictedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+ LOG.info("Updating evicted feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addEvictedInstance(context);
}
- private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException {
- LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601());
+ private void updateImportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+ LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addImportedInstance(context);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 0c3fcee..32caca3 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Iterator;
+import java.util.Map;
/**
* Base class for Metadata relationship mapping helper.
@@ -113,12 +114,14 @@ public abstract class RelationshipGraphBuilder {
}
protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
- String edgeLabel, String timestamp) {
+ String edgeLabel, Map<RelationshipProperty, String> properties) {
Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
- if (timestamp != null) {
- edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+ if (properties != null) {
+ for (Map.Entry<RelationshipProperty, String> property : properties.entrySet()) {
+ edgeToVertex.setProperty(property.getKey().getName(), property.getValue());
+ }
}
return edgeToVertex;
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
index ff437d9..fd23d4f 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
@@ -23,7 +23,7 @@ package org.apache.falcon.metadata;
*/
public enum RelationshipProperty {
- // vertex property keys - indexed
+ // vertex/edge property keys - indexed
NAME("name"),
TYPE("type"),
TIMESTAMP("timestamp"),
@@ -39,8 +39,10 @@ public enum RelationshipProperty {
RUN_ID("runId", "current run-id of the instance"),
STATUS("status", "status of the user workflow instance"),
WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex: oozie"),
- USER_SUBFLOW_ID("subflowId", "external id of user workflow");
+ USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
+ // instance-entity edge property
+ NOMINAL_TIME("nominalTime");
private final String name;
private final String description;
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 2497cce..123d63c 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -156,6 +156,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
*.falcon.graph.transaction.retry.count=3
*.falcon.graph.transaction.retry.delay=5
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
# can use other reporters like ganglia also.
# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index faaedfa..51a791e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -169,6 +169,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.falcon.graph.transaction.retry.count=3
*.falcon.graph.transaction.retry.delay=5
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
# can use other reporters like ganglia also.
# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the