You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/29 12:47:50 UTC
falcon git commit: FALCON-1750 Null Pointer Exception while listening to Workflow Notifi…
Repository: falcon
Updated Branches:
refs/heads/master c3912dead -> ae08745f1
FALCON-1750 Null Pointer Exception while listening to Workflow Notifi…
…cations
Author: Pallavi Rao <pa...@inmobi.com>
Reviewers: Sandeep Samudrala <sa...@inmobi.com>, Pavan Kolamuri <pa...@inmobi.com>
Closes #17 from pallavi-rao/master and squashes the following commits:
ae8dacb [Pallavi Rao] FALCON-1750 Addressed review comments
9e95e12 [Pallavi Rao] Merge remote-tracking branch 'upstream/master'
b04519e [Pallavi Rao] FALCON-1750 Null Pointer Exception while listening to Workflow Notifications
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ae08745f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ae08745f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ae08745f
Branch: refs/heads/master
Commit: ae08745f10cfbfa5df5a7f797ac04bded6daf3fa
Parents: c3912de
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Jan 29 17:17:05 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jan 29 17:17:05 2016 +0530
----------------------------------------------------------------------
.../WorkflowJobEndNotificationService.java | 105 ++++++++++---------
.../WorkflowJobEndNotificationServiceTest.java | 4 +-
.../falcon/messaging/JMSMessageConsumer.java | 2 +-
.../messaging/JMSMessageConsumerTest.java | 7 ++
.../falcon/rerun/handler/RetryHandler.java | 4 +
5 files changed, 70 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 630c56c..faea25c 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -21,10 +21,9 @@ package org.apache.falcon.workflow;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.service.FalconService;
@@ -54,8 +53,6 @@ public class WorkflowJobEndNotificationService implements FalconService {
// Maintain a cache of context built, so we don't have to query Oozie for every state change.
private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
- private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
-
@Override
public String getName() {
return SERVICE_NAME;
@@ -97,17 +94,19 @@ public class WorkflowJobEndNotificationService implements FalconService {
listeners.remove(listener);
}
- public void notifyFailure(WorkflowExecutionContext context) {
+ public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
notifyWorkflowEnd(context);
}
- public void notifySuccess(WorkflowExecutionContext context) {
+ public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
notifyWorkflowEnd(context);
}
- public void notifyStart(WorkflowExecutionContext context) {
+ public void notifyStart(WorkflowExecutionContext context) throws FalconException {
// Start notifications can only be from Oozie JMS notifications
- updateContextFromWFConf(context);
+ if (!updateContextFromWFConf(context)) {
+ return;
+ }
LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
@@ -119,9 +118,11 @@ public class WorkflowJobEndNotificationService implements FalconService {
}
}
- public void notifySuspend(WorkflowExecutionContext context) {
+ public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
// Suspend notifications can only be from Oozie JMS notifications
- updateContextFromWFConf(context);
+ if (!updateContextFromWFConf(context)) {
+ return;
+ }
LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
@@ -136,9 +137,13 @@ public class WorkflowJobEndNotificationService implements FalconService {
contextMap.remove(context.getWorkflowId());
}
- public void notifyWait(WorkflowExecutionContext context) {
+ public void notifyWait(WorkflowExecutionContext context) throws FalconException {
// Wait notifications can only be from Oozie JMS notifications
- updateContextFromWFConf(context);
+
+ if (!updateContextFromWFConf(context)) {
+ return;
+ }
+
LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
for (WorkflowExecutionListener listener : listeners) {
try {
@@ -152,48 +157,48 @@ public class WorkflowJobEndNotificationService implements FalconService {
// The method retrieves the conf from the cache if it is in cache.
// Else, queries WF Engine to retrieve the conf of the workflow
- private void updateContextFromWFConf(WorkflowExecutionContext context) {
- try {
- Properties wfProps = contextMap.get(context.getWorkflowId());
- if (wfProps == null) {
- Entity entity = CONFIG_STORE.get(EntityType.valueOf(context.getEntityType()), context.getEntityName());
- // Entity can be null in case of delete. Engine will generate notifications for instance kills.
- // But, the entity would no longer be in the config store.
- if (entity == null) {
- return;
- }
- for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
- try {
- InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
- .getJobDetails(cluster, context.getWorkflowId()).getInstances();
- if (instances != null && instances.length > 0) {
- wfProps = getWFProps(instances[0].getWfParams());
- // Required by RetryService. But, is not part of conf.
- wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
- Integer.toString(instances[0].getRunId()));
- }
- } catch (FalconException e) {
- // Do Nothing. The workflow may not have been deployed on this cluster.
- continue;
+ private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
+ Properties wfProps = contextMap.get(context.getWorkflowId());
+ if (wfProps == null) {
+ Entity entity = null;
+ try {
+ entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
+ } catch (EntityNotRegisteredException e) {
+ // Entity no longer exists. No need to notify.
+ LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.",
+ context.getEntityName(), context.getEntityType());
+ contextMap.remove(context.getWorkflowId());
+ return false;
+ }
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ try {
+ InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
+ .getJobDetails(cluster, context.getWorkflowId()).getInstances();
+ if (instances != null && instances.length > 0) {
+ wfProps = getWFProps(instances[0].getWfParams());
+ // Required by RetryService. But, is not part of conf.
+ wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
+ Integer.toString(instances[0].getRunId()));
}
- contextMap.put(context.getWorkflowId(), wfProps);
+ } catch (FalconException e) {
+ // Do Nothing. Move on to the next cluster.
+ continue;
}
+ contextMap.put(context.getWorkflowId(), wfProps);
}
+ }
- // No extra props to enhance the context with.
- if (wfProps == null || wfProps.isEmpty()) {
- return;
- }
+ // No extra props to enhance the context with.
+ if (wfProps == null || wfProps.isEmpty()) {
+ return true;
+ }
- for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
- if (wfProps.containsKey(arg.getName())) {
- context.setValue(arg, wfProps.getProperty(arg.getName()));
- }
+ for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+ if (wfProps.containsKey(arg.getName())) {
+ context.setValue(arg, wfProps.getProperty(arg.getName()));
}
-
- } catch (FalconException e) {
- LOG.error("Unable to retrieve entity {} of type {} from config store.", e);
}
+ return true;
}
private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
@@ -205,7 +210,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
}
// This method handles both success and failure notifications.
- private void notifyWorkflowEnd(WorkflowExecutionContext context) {
+ private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
// Need to distinguish notification from post processing for backward compatibility
if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
boolean engineNotifEnabled = false;
@@ -224,7 +229,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
updateContextWithTime(context);
}
} else {
- updateContextFromWFConf(context);
+ if (!updateContextFromWFConf(context)) {
+ return;
+ }
}
LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 1a9597b..9dd8f93 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -127,11 +127,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
}
- private void notifyFailure(WorkflowExecutionContext context) {
+ private void notifyFailure(WorkflowExecutionContext context) throws FalconException {
service.notifyFailure(context);
}
- private void notifySuccess(WorkflowExecutionContext context) {
+ private void notifySuccess(WorkflowExecutionContext context) throws FalconException {
service.notifySuccess(context);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 2380e47..ccc2cfb 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -189,7 +189,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
return dateFormat.format(time);
}
- private void invokeListener(WorkflowExecutionContext context) {
+ private void invokeListener(WorkflowExecutionContext context) throws FalconException {
// Login the user so listeners can access FS and WfEngine as this user
CurrentUser.authenticate(context.getWorkflowUser());
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 5600746..5c53a3e 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -22,6 +22,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.util.FalconTestUtil;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -73,6 +76,9 @@ public class JMSMessageConsumerTest {
BROKER_URL, TOPIC_NAME + "," + SECONDARY_TOPIC_NAME, jobEndService);
subscriber.startSubscriber();
+ Process mockProcess = new Process();
+ mockProcess.setName("process1");
+ ConfigurationStore.get().publish(EntityType.PROCESS, mockProcess);
}
public void sendMessages(String topic, WorkflowExecutionContext.Type type)
@@ -278,6 +284,7 @@ public class JMSMessageConsumerTest {
@AfterMethod
public void tearDown() throws Exception{
+ ConfigurationStore.get().remove(EntityType.PROCESS, "process1");
broker.stop();
subscriber.closeSubscriber();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/ae08745f/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index fe2ceda..fac32b3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -19,6 +19,7 @@ package org.apache.falcon.rerun.handler;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.Frequency;
@@ -75,6 +76,9 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
"All retry attempt failed out of configured: "
+ attempts + " attempt for entity instance::");
}
+ } catch (EntityNotRegisteredException ee) {
+ LOG.warn("Entity {} of type {} doesn't exist in config store. Retry will be skipped.",
+ entityName, entityType);
} catch (FalconException e) {
LOG.error("Error during retry of entity instance {}:{}", entityName, nominalTime, e);
GenericAlert.alertRetryFailed(entityType, entityName, nominalTime,