You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/08/18 00:05:58 UTC

falcon git commit: FALCON-1776 Process instance update in titan DB based on JMS notifications on coordinator actions

Repository: falcon
Updated Branches:
  refs/heads/master 3a7c993b5 -> fc27ebb84


FALCON-1776 Process instance update in titan DB based on JMS notifications on coordinator actions

In JMS notifications from the coordinator action, it is not guaranteed to provide information on timestamp and user workflow version. Need to allow these properties to be optional when adding process instance to titan DB. Also need to address the different format when there is no input/output feed. See the picture below for an example result (i.e. WAITING instance).

<img width="1083" alt="screen shot 2016-08-12 at 4 50 59 pm" src="https://cloud.githubusercontent.com/assets/10202347/17640302/aec16864-60b1-11e6-9a38-b6dc5cd8b890.png">

Author: yzheng-hortonworks <yz...@hortonworks.com>

Reviewers: "sandeep <sa...@gmail.com>, Peeyush<pe...@apache.org>, Balu <ba...@apache.org>"

Closes #263 from yzheng-hortonworks/FALCON-1776


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fc27ebb8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fc27ebb8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fc27ebb8

Branch: refs/heads/master
Commit: fc27ebb84b692a7d6e961b1a1e00821ad2a3df51
Parents: 3a7c993
Author: yzheng-hortonworks <yz...@hortonworks.com>
Authored: Wed Aug 17 17:05:53 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Wed Aug 17 17:05:53 2016 -0700

----------------------------------------------------------------------
 .../metadata/InstanceRelationshipGraphBuilder.java  | 16 +++++++++-------
 .../falcon/metadata/MetadataMappingService.java     |  8 +++++---
 .../falcon/metadata/RelationshipGraphBuilder.java   |  8 +++++---
 .../falcon/workflow/WorkflowExecutionContext.java   |  4 ++++
 .../workflow/WorkflowJobEndNotificationService.java |  3 +++
 5 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/fc27ebb8/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index f9cd2b9..4256611 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -76,8 +76,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         String processInstanceName = getProcessInstanceName(context);
         LOG.info("Adding process instance: {}", processInstanceName);
 
-        Vertex processInstance = addVertex(processInstanceName,
-                RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong());
+        Vertex processInstance = addVertex(processInstanceName, RelationshipType.PROCESS_INSTANCE,
+                context.hasTimeStamp() ? context.getTimeStampAsLong() : null);
         addWorkflowInstanceProperties(processInstance, context);
 
         Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context);
@@ -123,8 +123,10 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             addProperty(processInstance, context, instanceWorkflowProperty);
         }
 
-        processInstance.setProperty(RelationshipProperty.VERSION.getName(),
-                context.getUserWorkflowVersion());
+        if (context.getUserWorkflowVersion() != null) {
+            processInstance.setProperty(RelationshipProperty.VERSION.getName(),
+                    context.getUserWorkflowVersion());
+        }
     }
 
     private void addProperty(Vertex vertex, WorkflowExecutionContext context,
@@ -169,7 +171,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     public void addOutputFeedInstances(WorkflowExecutionContext context,
                                        Vertex processInstance) throws FalconException {
         String outputFeedNamesArg = context.getOutputFeedNames();
-        if (NONE.equals(outputFeedNamesArg) || IGNORE.equals(outputFeedNamesArg)) {
+        if (outputFeedNamesArg == null || NONE.equals(outputFeedNamesArg) || IGNORE.equals(outputFeedNamesArg)) {
             return; // there are no output feeds for this process
         }
 
@@ -187,7 +189,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     public void addInputFeedInstances(WorkflowExecutionContext context,
                                       Vertex processInstance) throws FalconException {
         String inputFeedNamesArg = context.getInputFeedNames();
-        if (NONE.equals(inputFeedNamesArg) || IGNORE.equals(inputFeedNamesArg)) {
+        if (inputFeedNamesArg == null || NONE.equals(inputFeedNamesArg) || IGNORE.equals(inputFeedNamesArg)) {
             return; // there are no input feeds for this process
         }
 
@@ -311,7 +313,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                                    String clusterName, boolean hasEdgeProperties) throws FalconException {
         LOG.info("Adding feed instance {}", feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
-                context.getTimeStampAsLong());
+                context.hasTimeStamp() ? context.getTimeStampAsLong() : null);
         feedInstance.setProperty(RelationshipProperty.STATUS.getName(), context.getValue(WorkflowExecutionArgs.STATUS));
 
         if (hasEdgeProperties) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/fc27ebb8/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 225e44a..aee19de 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -394,12 +394,14 @@ public class MetadataMappingService
     }
 
     private void updateInstanceStatus(final WorkflowExecutionContext context) throws FalconException {
-        if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
-            // TODO(yzheng): FALCON-1776 Instance update on titan DB based on JMS notifications on coordinator actions
+        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
+        if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION
+                && entityOperation != WorkflowExecutionContext.EntityOperations.GENERATE) {
+            // TODO(yzheng): FALCON-2114 Feed Instance update on titan DB
+            //               based on JMS notifications on coordinator actions
             return;
         }
 
-        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
         switch (entityOperation) {
         case GENERATE:
             updateProcessInstance(context);

http://git-wip-us.apache.org/repos/asf/falcon/blob/fc27ebb8/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 32caca3..1746c1a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -74,7 +74,7 @@ public abstract class RelationshipGraphBuilder {
         return createVertex(name, type);
     }
 
-    protected Vertex addVertex(String name, RelationshipType type, long timestamp) {
+    protected Vertex addVertex(String name, RelationshipType type, Long timestamp) {
         Vertex vertex = findVertex(name, type);
         if (vertex != null) {
             LOG.debug("Found an existing vertex for: name={}, type={}", name, type);
@@ -98,13 +98,15 @@ public abstract class RelationshipGraphBuilder {
         return createVertex(name, type, System.currentTimeMillis());
     }
 
-    protected Vertex createVertex(String name, RelationshipType type, long timestamp) {
+    protected Vertex createVertex(String name, RelationshipType type, Long timestamp) {
         LOG.debug("Creating a new vertex for: name={}, type={}", name, type);
 
         Vertex vertex = graph.addVertex(null);
         vertex.setProperty(RelationshipProperty.NAME.getName(), name);
         vertex.setProperty(RelationshipProperty.TYPE.getName(), type.getName());
-        vertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+        if (timestamp != null) {
+            vertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp);
+        }
 
         return vertex;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fc27ebb8/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9b011b8..cccbe3b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -186,6 +186,10 @@ public class WorkflowExecutionContext {
         return getValue(WorkflowExecutionArgs.TIMESTAMP);
     }
 
+    public boolean hasTimeStamp() {
+        return containsKey(WorkflowExecutionArgs.TIMESTAMP);
+    }
+
     /**
      * Returns timestamp as a long.
      * @return Date as long (milliseconds since epoch) for the timestamp.

http://git-wip-us.apache.org/repos/asf/falcon/blob/fc27ebb8/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 6d1332e..e6e1458 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -139,6 +139,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     public void notifyWait(WorkflowExecutionContext context) throws FalconException {
         // Wait notifications can only be from Oozie JMS notifications
+        if (!updateContextFromWFConf(context)) {
+            return;
+        }
         LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {