You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/29 12:47:50 UTC

falcon git commit: FALCON-1750 Null Pointer Exception while listening to Workflow Notifi…

Repository: falcon
Updated Branches:
  refs/heads/master c3912dead -> ae08745f1


FALCON-1750 Null Pointer Exception while listening to Workflow Notifi…

…cations

Author: Pallavi Rao <pa...@inmobi.com>

Reviewers: Sandeep Samudrala <sa...@inmobi.com>, Pavan Kolamuri <pa...@inmobi.com>

Closes #17 from pallavi-rao/master and squashes the following commits:

ae8dacb [Pallavi Rao] FALCON-1750 Addressed review comments
9e95e12 [Pallavi Rao] Merge remote-tracking branch 'upstream/master'
b04519e [Pallavi Rao] FALCON-1750 Null Pointer Exception while listening to Workflow Notifications


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ae08745f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ae08745f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ae08745f

Branch: refs/heads/master
Commit: ae08745f10cfbfa5df5a7f797ac04bded6daf3fa
Parents: c3912de
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Jan 29 17:17:05 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jan 29 17:17:05 2016 +0530

----------------------------------------------------------------------
 .../WorkflowJobEndNotificationService.java      | 105 ++++++++++---------
 .../WorkflowJobEndNotificationServiceTest.java  |   4 +-
 .../falcon/messaging/JMSMessageConsumer.java    |   2 +-
 .../messaging/JMSMessageConsumerTest.java       |   7 ++
 .../falcon/rerun/handler/RetryHandler.java      |   4 +
 5 files changed, 70 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 630c56c..faea25c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -21,10 +21,9 @@ package org.apache.falcon.workflow;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.service.FalconService;
@@ -54,8 +53,6 @@ public class WorkflowJobEndNotificationService implements FalconService {
     // Maintain a cache of context built, so we don't have to query Oozie for every state change.
     private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
 
-    private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
-
     @Override
     public String getName() {
         return SERVICE_NAME;
@@ -97,17 +94,19 @@ public class WorkflowJobEndNotificationService implements FalconService {
         listeners.remove(listener);
     }
 
-    public void notifyFailure(WorkflowExecutionContext context) {
+    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
         notifyWorkflowEnd(context);
     }
 
-    public void notifySuccess(WorkflowExecutionContext context) {
+    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
         notifyWorkflowEnd(context);
     }
 
-    public void notifyStart(WorkflowExecutionContext context) {
+    public void notifyStart(WorkflowExecutionContext context) throws FalconException {
         // Start notifications can only be from Oozie JMS notifications
-        updateContextFromWFConf(context);
+        if (!updateContextFromWFConf(context)) {
+            return;
+        }
         LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {
@@ -119,9 +118,11 @@ public class WorkflowJobEndNotificationService implements FalconService {
         }
     }
 
-    public void notifySuspend(WorkflowExecutionContext context) {
+    public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
         // Suspend notifications can only be from Oozie JMS notifications
-        updateContextFromWFConf(context);
+        if (!updateContextFromWFConf(context)) {
+            return;
+        }
         LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {
@@ -136,9 +137,13 @@ public class WorkflowJobEndNotificationService implements FalconService {
         contextMap.remove(context.getWorkflowId());
     }
 
-    public void notifyWait(WorkflowExecutionContext context) {
+    public void notifyWait(WorkflowExecutionContext context) throws FalconException {
         // Wait notifications can only be from Oozie JMS notifications
-        updateContextFromWFConf(context);
+
+        if (!updateContextFromWFConf(context)) {
+            return;
+        }
+
         LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
         for (WorkflowExecutionListener listener : listeners) {
             try {
@@ -152,48 +157,48 @@ public class WorkflowJobEndNotificationService implements FalconService {
 
     // The method retrieves the conf from the cache if it is in cache.
     // Else, queries WF Engine to retrieve the conf of the workflow
-    private void updateContextFromWFConf(WorkflowExecutionContext context) {
-        try {
-            Properties wfProps = contextMap.get(context.getWorkflowId());
-            if (wfProps == null) {
-                Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName());
-                // Entity can be null in case of delete. Engine will generate notifications for instance kills.
-                // But, the entity would no longer be in the config store.
-                if (entity == null) {
-                    return;
-                }
-                for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
-                    try {
-                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
-                                .getJobDetails(cluster, context.getWorkflowId()).getInstances();
-                        if (instances != null && instances.length > 0) {
-                            wfProps = getWFProps(instances[0].getWfParams());
-                            // Required by RetryService. But, is not part of conf.
-                            wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
-                                    Integer.toString(instances[0].getRunId()));
-                        }
-                    } catch (FalconException e) {
-                        // Do Nothing. The workflow may not have been deployed on this cluster.
-                        continue;
+    private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
+        Properties wfProps = contextMap.get(context.getWorkflowId());
+        if (wfProps == null) {
+            Entity entity = null;
+            try {
+                entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
+            } catch (EntityNotRegisteredException e) {
+                // Entity no longer exists. No need to notify.
+                LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.",
+                        context.getEntityName(), context.getEntityType());
+                contextMap.remove(context.getWorkflowId());
+                return false;
+            }
+            for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                try {
+                    InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
+                            .getJobDetails(cluster, context.getWorkflowId()).getInstances();
+                    if (instances != null && instances.length > 0) {
+                        wfProps = getWFProps(instances[0].getWfParams());
+                        // Required by RetryService. But, is not part of conf.
+                        wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
+                                Integer.toString(instances[0].getRunId()));
                     }
-                    contextMap.put(context.getWorkflowId(), wfProps);
+                } catch (FalconException e) {
+                    // Do Nothing. Move on to the next cluster.
+                    continue;
                 }
+                contextMap.put(context.getWorkflowId(), wfProps);
             }
+        }
 
-            // No extra props to enhance the context with.
-            if (wfProps == null || wfProps.isEmpty()) {
-                return;
-            }
+        // No extra props to enhance the context with.
+        if (wfProps == null || wfProps.isEmpty()) {
+            return true;
+        }
 
-            for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
-                if (wfProps.containsKey(arg.getName())) {
-                    context.setValue(arg, wfProps.getProperty(arg.getName()));
-                }
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            if (wfProps.containsKey(arg.getName())) {
+                context.setValue(arg, wfProps.getProperty(arg.getName()));
             }
-
-        } catch (FalconException e) {
-            LOG.error("Unable to retrieve entity {} of type {} from config store.", e);
         }
+        return true;
     }
 
     private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
@@ -205,7 +210,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
     }
 
     // This method handles both success and failure notifications.
-    private void notifyWorkflowEnd(WorkflowExecutionContext context) {
+    private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
         // Need to distinguish notification from post processing for backward compatibility
         if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
             boolean engineNotifEnabled = false;
@@ -224,7 +229,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
                 updateContextWithTime(context);
             }
         } else {
-            updateContextFromWFConf(context);
+            if (!updateContextFromWFConf(context)) {
+                return;
+            }
         }
 
         LOG.debug("Sending workflow end notification to listeners with context : {} ", context);

http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 1a9597b..9dd8f93 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -127,11 +127,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
 
     }
 
-    private void notifyFailure(WorkflowExecutionContext context) {
+    private void notifyFailure(WorkflowExecutionContext context) throws FalconException {
         service.notifyFailure(context);
     }
 
-    private void notifySuccess(WorkflowExecutionContext context) {
+    private void notifySuccess(WorkflowExecutionContext context) throws FalconException {
         service.notifySuccess(context);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 2380e47..ccc2cfb 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -189,7 +189,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         return dateFormat.format(time);
     }
 
-    private void invokeListener(WorkflowExecutionContext context) {
+    private void invokeListener(WorkflowExecutionContext context) throws FalconException {
         // Login the user so listeners can access FS and WfEngine as this user
         CurrentUser.authenticate(context.getWorkflowUser());
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 5600746..5c53a3e 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -22,6 +22,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.util.FalconTestUtil;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -73,6 +76,9 @@ public class JMSMessageConsumerTest {
                 BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService);
 
         subscriber.startSubscriber();
+        Process mockProcess = new Process();
+        mockProcess.setName("process1");
+        ConfigurationStore.get().publish(EntityType.PROCESS, mockProcess);
     }
 
     public void sendMessages(String topic, WorkflowExecutionContext.Type type)
@@ -278,6 +284,7 @@ public class JMSMessageConsumerTest {
 
     @AfterMethod
     public void tearDown() throws Exception{
+        ConfigurationStore.get().remove(EntityType.PROCESS, "process1");
         broker.stop();
         subscriber.closeSubscriber();
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index fe2ceda..fac32b3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -19,6 +19,7 @@ package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.Frequency;
@@ -75,6 +76,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
                         "All retry attempt failed out of configured: "
                                 + attempts + " attempt for entity instance::");
             }
+        } catch (EntityNotRegisteredException ee) {
+            LOG.warn("Entity {} of type {} doesn't exist in config store. Retry will be skipped.",
+                    entityName, entityType);
         } catch (FalconException e) {
             LOG.error("Error during retry of entity instance {}:{}", entityName, nominalTime, e);
             GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,