You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/03/05 02:18:36 UTC

falcon git commit: yes

Repository: falcon
Updated Branches:
  refs/heads/master 6c787783b -> f40926669


yes

Add running, succeeded, failed, killed and suspended instances to titan DB based on JMS notifications on workflow jobs. Instance-entity edge properties (e.g. nominal time, status) are also added for vertex-centric indexing.

Author: yzheng-hortonworks <yz...@hortonworks.com>

Reviewers: Balu Vellanki <bv...@hortonworks.com>, Venkatesan Ramachandran <Me...@gmail.com>

Closes #47 from yzheng-hortonworks/FALCON-1111 and squashes the following commits:

f2c9faf [yzheng-hortonworks] review by Balu
ff9556b [yzheng-hortonworks] review from Venky
b76f642 [yzheng-hortonworks] FALCON-1111 Instance update on titan DB based on JMS notifications on workflow jobs


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4092666
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4092666
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4092666

Branch: refs/heads/master
Commit: f40926669a68d7ad3d548d558e490df6d53a3e0b
Parents: 6c78778
Author: yzheng-hortonworks <yz...@hortonworks.com>
Authored: Fri Mar 4 17:18:38 2016 -0800
Committer: bvellanki <bv...@hortonworks.com>
Committed: Fri Mar 4 17:18:38 2016 -0800

----------------------------------------------------------------------
 .../InstanceRelationshipGraphBuilder.java       | 89 ++++++++++----------
 .../falcon/metadata/MetadataMappingService.java | 83 ++++++++++--------
 .../metadata/RelationshipGraphBuilder.java      |  9 +-
 .../falcon/metadata/RelationshipProperty.java   |  6 +-
 common/src/main/resources/startup.properties    |  4 +
 src/conf/startup.properties                     |  4 +
 6 files changed, 112 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index b709857..1d48f75 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
 
 /**
@@ -78,8 +80,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong());
         addWorkflowInstanceProperties(processInstance, context);
 
+        Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
         addInstanceToEntity(processInstance, context.getEntityName(),
-                RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+                RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties);
         addInstanceToEntity(processInstance, context.getClusterName(),
                 RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
         addInstanceToEntity(processInstance, context.getWorkflowUser(),
@@ -153,15 +156,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
     public void addInstanceToEntity(Vertex instanceVertex, String entityName,
                                     RelationshipType entityType, RelationshipLabel edgeLabel,
-                                    String timestamp) {
+                                    Map<RelationshipProperty, String> properties) {
         Vertex entityVertex = findVertex(entityName, entityType);
         LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex);
         if (entityVertex == null) {
             LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName);
             throw new IllegalStateException(entityType + " entity vertex must exist " + entityName);
         }
-
-        addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp);
+        addEdge(instanceVertex, entityVertex, edgeLabel.getName(), properties);
     }
 
     public void addOutputFeedInstances(WorkflowExecutionContext context,
@@ -215,19 +217,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 feedInstanceDataPath, targetClusterName);
         String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}",
-                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-            LOG.info("{} instance vertex {} does not exist, add it",
-                    RelationshipType.FEED_INSTANCE, feedInstanceName);
-            feedInstanceVertex = addFeedInstance(// add a new instance
-                    feedInstanceName, context, feedName, context.getSrcClusterName());
-        }
+        Vertex feedInstanceVertex = addFeedInstance(
+                feedInstanceName, context, feedName, context.getSrcClusterName());
 
+        Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+        properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
         addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
-                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, properties);
 
         addCounters(feedInstanceVertex, context);
     }
@@ -250,20 +246,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                     feedName, evictedFeedInstancePath, clusterName);
             String feedInstanceName = getFeedInstanceName(feedName, clusterName,
                     evictedFeedInstancePath, context.getNominalTimeAsISO8601());
-            Vertex feedInstanceVertex = findVertex(feedInstanceName,
-                    RelationshipType.FEED_INSTANCE);
-
-            LOG.info("Vertex exists? name={}, type={}, v={}",
-                    feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-            if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-                LOG.info("{} instance vertex {} does not exist, add it",
-                        RelationshipType.FEED_INSTANCE, feedInstanceName);
-                feedInstanceVertex = addFeedInstance(// add a new instance
-                        feedInstanceName, context, feedName, clusterName);
-            }
+            Vertex feedInstanceVertex = addFeedInstance(
+                    feedInstanceName, context, feedName, clusterName);
 
+            Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+            properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
             addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
-                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601());
+                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, properties);
         }
     }
 
@@ -280,20 +269,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 feedInstanceDataPath, sourceClusterName, datasourceName);
         String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}",
-                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
-        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
-            LOG.info("{} instance vertex {} does not exist, add it",
-                    RelationshipType.FEED_INSTANCE, feedInstanceName);
-            feedInstanceVertex = addFeedInstance(// add a new instance
-                    feedInstanceName, context, feedName, context.getSrcClusterName());
-        }
+        Vertex feedInstanceVertex = addFeedInstance(
+                feedInstanceName, context, feedName, context.getSrcClusterName());
+
+        Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+        properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601());
         addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
-                RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601());
+                RelationshipLabel.DATASOURCE_IMPORT_EDGE, properties);
         addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
-                RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601());
+                RelationshipLabel.FEED_CLUSTER_EDGE, properties);
     }
 
     public String getImportInstanceName(WorkflowExecutionContext context) {
@@ -308,18 +292,30 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 feedInstanceDataPath, clusterName);
         String feedInstanceName = getFeedInstanceName(feedName, clusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+        Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName, false);
         addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
     }
 
     private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
                                    String feedName, String clusterName) throws FalconException {
+        return addFeedInstance(feedInstanceName, context, feedName, clusterName, true);
+    }
+
+    private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context, String feedName,
+                                   String clusterName, boolean hasEdgeProperties) throws FalconException {
         LOG.info("Adding feed instance {}", feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
                 context.getTimeStampAsLong());
-
-        addInstanceToEntity(feedInstance, feedName,
-                RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+        feedInstance.setProperty(RelationshipProperty.STATUS.getName(), context.getValue(WorkflowExecutionArgs.STATUS));
+
+        if (hasEdgeProperties) {
+            Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
+            addInstanceToEntity(feedInstance, feedName,
+                    RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties);
+        } else {
+            addInstanceToEntity(feedInstance, feedName,
+                    RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
+        }
         addInstanceToEntity(feedInstance, clusterName,
                 RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
         addInstanceToEntity(feedInstance, context.getWorkflowUser(),
@@ -334,6 +330,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         return feedInstance;
     }
 
+    private Map<RelationshipProperty, String> edgePropertiesForIndexing(WorkflowExecutionContext context) {
+        Map<RelationshipProperty, String> properties = new HashMap<RelationshipProperty, String>();
+        properties.put(RelationshipProperty.NOMINAL_TIME, context.getNominalTimeAsISO8601());
+        properties.put(RelationshipProperty.STATUS, context.getValue(WorkflowExecutionArgs.STATUS));
+        return properties;
+    }
+
     public static String getFeedInstanceName(String feedName, String clusterName,
                                              String feedInstancePath,
                                              String nominalTime) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index cf2b651..7d22fd5 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -257,14 +257,42 @@ public class MetadataMappingService
     }
 
     @Override
+    public void onStart(final WorkflowExecutionContext context) throws FalconException {
+        LOG.info("onStart {}", context);
+        onInstanceExecutionUpdate(context);
+    }
+
+    @Override
     public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding lineage for context {}", context);
+        LOG.info("onSuccess {}", context);
+        onInstanceExecutionUpdate(context);
+    }
+
+    @Override
+    public void onFailure(final WorkflowExecutionContext context) throws FalconException {
+        LOG.info("onFailure {}", context);
+        onInstanceExecutionUpdate(context);
+    }
+
+    @Override
+    public void onSuspend(final WorkflowExecutionContext context) throws FalconException {
+        LOG.info("onSuspend {}", context);
+        onInstanceExecutionUpdate(context);
+    }
+
+    @Override
+    public void onWait(final WorkflowExecutionContext context) throws FalconException {
+        LOG.info("onWait {}", context);
+        onInstanceExecutionUpdate(context);
+    }
+
+    private void onInstanceExecutionUpdate(final WorkflowExecutionContext context) throws FalconException {
         try {
             new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
                     .perform(new TransactionWork<Void>() {
                         @Override
                         public Void execute(TransactionalGraph transactionalGraph) throws Exception {
-                            onSuccessfulExecution(context);
+                            updateInstanceStatus(context);
                             transactionalGraph.commit();
                             return null;
                         }
@@ -275,64 +303,49 @@ public class MetadataMappingService
         }
     }
 
-    private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException {
+    private void updateInstanceStatus(final WorkflowExecutionContext context) throws FalconException {
+        if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
+            // TODO(yzheng): FALCON-1776 Instance update on titan DB based on JMS notifications on coordinator actions
+            return;
+        }
+
         WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
         switch (entityOperation) {
         case GENERATE:
-            onProcessInstanceExecuted(context);
+            updateProcessInstance(context);
             break;
         case REPLICATE:
-            onFeedInstanceReplicated(context);
+            updateReplicatedFeedInstance(context);
             break;
         case DELETE:
-            onFeedInstanceEvicted(context);
+            updateEvictedFeedInstance(context);
             break;
         case IMPORT:
-            onFeedInstanceImported(context);
+            updateImportedFeedInstance(context);
             break;
         default:
             throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
         }
     }
 
-    @Override
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
-        // do nothing since lineage is only recorded for successful workflow
-    }
-
-    @Override
-    public void onStart(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
-        // Do nothing
-    }
-
-    @Override
-    public void onWait(WorkflowExecutionContext context) throws FalconException {
-        // TBD
-    }
-
-
-    private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
+    private void updateProcessInstance(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Updating process instance: {}", context.getNominalTimeAsISO8601());
         Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
         instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
         instanceGraphBuilder.addInputFeedInstances(context, processInstance);
     }
 
-    private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
+    private void updateReplicatedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Updating replicated feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addReplicatedInstance(context);
     }
 
-    private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
+    private void updateEvictedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Updating evicted feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addEvictedInstance(context);
     }
-    private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException {
-        LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601());
+    private void updateImportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addImportedInstance(context);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 0c3fcee..32caca3 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Base class for Metadata relationship mapping helper.
@@ -113,12 +114,14 @@ public abstract class RelationshipGraphBuilder {
     }
 
     protected Edge addEdge(Vertex fromVertex, Vertex toVertex,
-                           String edgeLabel, String timestamp) {
+                           String edgeLabel, Map<RelationshipProperty, String> properties) {
         Edge edge = findEdge(fromVertex, toVertex, edgeLabel);
 
         Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex);
-        if (timestamp != null) {
-            edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+        if (properties != null) {
+            for (Map.Entry<RelationshipProperty, String> property : properties.entrySet()) {
+                edgeToVertex.setProperty(property.getKey().getName(), property.getValue());
+            }
         }
 
         return edgeToVertex;

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
index ff437d9..fd23d4f 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java
@@ -23,7 +23,7 @@ package org.apache.falcon.metadata;
  */
 public enum RelationshipProperty {
 
-    // vertex property keys - indexed
+    // vertex/edge property keys - indexed
     NAME("name"),
     TYPE("type"),
     TIMESTAMP("timestamp"),
@@ -39,8 +39,10 @@ public enum RelationshipProperty {
     RUN_ID("runId", "current run-id of the instance"),
     STATUS("status", "status of the user workflow instance"),
     WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex: oozie"),
-    USER_SUBFLOW_ID("subflowId", "external id of user workflow");
+    USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
 
+    // instance-entity edge property
+    NOMINAL_TIME("nominalTime");
 
     private final String name;
     private final String description;

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 2497cce..123d63c 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -156,6 +156,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 *.falcon.graph.transaction.retry.count=3
 *.falcon.graph.transaction.retry.delay=5
 
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
 # Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
 # can use other reporters like ganglia also.
 # Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the

http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index faaedfa..51a791e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -169,6 +169,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.falcon.graph.transaction.retry.count=3
 *.falcon.graph.transaction.retry.delay=5
 
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
 # Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
 # can use other reporters like ganglia also.
 # Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the