You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:56 UTC
[2/4] git commit: FALCON-665 Handle message consumption failures in
JMSMessageConsumer. Contributed by Venkatesh Seetharam
FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/caa22842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/caa22842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/caa22842
Branch: refs/heads/master
Commit: caa22842913419e1a3acab292ca7c5396d639d60
Parents: df6f1d8
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:35:22 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../workflow/WorkflowExecutionContext.java | 5 ++
.../WorkflowJobEndNotificationService.java | 25 +++----
.../WorkflowJobEndNotificationServiceTest.java | 12 +---
.../falcon/messaging/JMSMessageConsumer.java | 75 ++++++++++++++------
.../messaging/JMSMessageConsumerTest.java | 16 ++++-
.../org/apache/falcon/aspect/GenericAlert.java | 10 +--
7 files changed, 92 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f638a6..824bddd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-665 Handle message consumption failures in JMSMessageConsumer
+ (Venkatesh Seetharam)
+
FALCON-662 Fetch relationships for a given type API (Balu Vellanki via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9c7b395..04ef037 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -285,6 +285,11 @@ public class WorkflowExecutionContext {
}
}
+ @Override
+ public String toString() {
+ return "WorkflowExecutionContext{" + context.toString() + "}";
+ }
+
@SuppressWarnings("unchecked")
public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fb2d58d..90b66c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -82,7 +82,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
listeners.remove(listener);
}
- public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
+ public void notifyFailure(WorkflowExecutionContext context) {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onFailure(context);
@@ -95,7 +95,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
instrumentAlert(context);
}
- public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
+ public void notifySuccess(WorkflowExecutionContext context) {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onSuccess(context);
@@ -108,7 +108,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
instrumentAlert(context);
}
- private void instrumentAlert(WorkflowExecutionContext context) throws FalconException {
+ private void instrumentAlert(WorkflowExecutionContext context) {
String clusterName = context.getClusterName();
String entityName = context.getEntityName();
String entityType = context.getEntityType();
@@ -118,14 +118,14 @@ public class WorkflowJobEndNotificationService implements FalconService {
String nominalTime = context.getNominalTimeAsISO8601();
String runId = String.valueOf(context.getWorkflowRunId());
- CurrentUser.authenticate(context.getWorkflowUser());
- AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
- Date startTime = result.getInstances()[0].startTime;
- Date endTime = result.getInstances()[0].endTime;
- Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
try {
+ CurrentUser.authenticate(context.getWorkflowUser());
+ AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+ InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
+ Date startTime = result.getInstances()[0].startTime;
+ Date endTime = result.getInstances()[0].endTime;
+ Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
if (context.hasWorkflowFailed()) {
GenericAlert.instrumentFailedInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
@@ -135,8 +135,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), duration);
}
- } catch (Exception e) {
- throw new FalconException(e);
+ } catch (FalconException e) {
+ // Logging an error and ignoring since there are listeners for extensions
+ LOG.error("Instrumenting alert failed for: " + context, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 9a6ad98..b7df443 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -109,19 +109,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
}
private void notifyFailure(WorkflowExecutionContext context) {
- try {
- service.notifyFailure(context);
- } catch (FalconException ignored) {
- // do nothing
- }
+ service.notifyFailure(context);
}
private void notifySuccess(WorkflowExecutionContext context) {
- try {
- service.notifySuccess(context);
- } catch (FalconException ignored) {
- // do nothing
- }
+ service.notifySuccess(context);
}
private static String[] getTestMessageArgs() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ec7bd93..4a0bc2a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -48,6 +48,8 @@ import java.util.Map;
public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
+ private static final String FALCON_CLIENT_ID = "falcon-server";
+
private final String implementation;
private final String userName;
private final String password;
@@ -56,7 +58,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private final WorkflowJobEndNotificationService jobEndNotificationService;
private Connection connection;
- private TopicSubscriber subscriber;
+ private TopicSession topicSession;
+ private TopicSubscriber topicSubscriber;
public JMSMessageConsumer(String implementation, String userName,
String password, String url, String topicName,
@@ -72,15 +75,17 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
public void startSubscriber() throws FalconException {
try {
connection = createAndGetConnection(implementation, userName, password, url);
- TopicSession session = (TopicSession) connection.createSession(
- false, Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic(topicName);
- subscriber = session.createSubscriber(destination);
- subscriber.setMessageListener(this);
+ connection.setClientID(FALCON_CLIENT_ID);
+
+ topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic destination = topicSession.createTopic(topicName);
+ topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+ topicSubscriber.setMessageListener(this);
+
connection.setExceptionListener(this);
connection.start();
} catch (Exception e) {
- LOG.error("Error starting subscriber of topic: " + this.toString(), e);
+ LOG.error("Error starting topicSubscriber of topic: " + this.toString(), e);
throw new FalconException(e);
}
}
@@ -88,17 +93,19 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
- LOG.info("Received message {}", message.toString());
+ LOG.info("Received JMS message {}", message.toString());
try {
WorkflowExecutionContext context = createContext(mapMessage);
+ LOG.info("Created context from JMS message {}", context);
+
if (context.hasWorkflowFailed()) {
onFailure(context);
} else if (context.hasWorkflowSucceeded()) {
onSuccess(context);
}
- } catch (Exception e) {
- String errorMessage = "Error in onMessage for subscriber of topic: "
+ } catch (JMSException e) {
+ String errorMessage = "Error in onMessage for topicSubscriber of topic: "
+ topicName + ", Message: " + message.toString();
LOG.info(errorMessage, e);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
@@ -118,33 +125,57 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
return WorkflowExecutionContext.create(wfProperties);
}
- public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ public void onFailure(WorkflowExecutionContext context) {
jobEndNotificationService.notifyFailure(context);
}
- public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ public void onSuccess(WorkflowExecutionContext context) {
jobEndNotificationService.notifySuccess(context);
}
@Override
public void onException(JMSException ignore) {
- String errorMessage = "Error in onException for subscriber of topic: " + topicName;
+ String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
LOG.info(errorMessage, ignore);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
}
- public void closeSubscriber() throws FalconException {
- try {
- LOG.info("Closing subscriber on topic : " + this.topicName);
- if (subscriber != null) {
- subscriber.close();
+ public void closeSubscriber() {
+ LOG.info("Closing topicSubscriber on topic : " + this.topicName);
+ // closing each quietly so client id can be unsubscribed
+ closeTopicSubscriberQuietly();
+ closeTopicSessionQuietly();
+ closeConnectionQuietly();
+ }
+
+ private void closeTopicSubscriberQuietly() {
+ if (topicSubscriber != null) {
+ try {
+ topicSubscriber.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
}
- if (connection != null) {
+ }
+ }
+
+ private void closeTopicSessionQuietly() {
+ if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
+ try {
+ topicSession.unsubscribe(FALCON_CLIENT_ID);
+ topicSession.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS topic session: " + topicSession, ignore);
+ }
+ }
+ }
+
+ private void closeConnectionQuietly() {
+ if (connection != null) {
+ try {
connection.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS connection: " + connection, ignore);
}
- } catch (JMSException e) {
- LOG.error("Error closing subscriber of topic: " + this.toString(), e);
- throw new FalconException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 9a4a6f7..974116d 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.messaging;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.falcon.FalconException;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -59,6 +60,7 @@ public class JMSMessageConsumerTest {
broker.setDataDirectory("target/activemq");
broker.setBrokerName("localhost");
broker.start();
+ broker.deleteAllMessages();
}
public void sendMessages(String topic) throws JMSException, FalconException, IOException {
@@ -137,11 +139,19 @@ public class JMSMessageConsumerTest {
BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
subscriber.startSubscriber();
sendMessages(TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+
+ final BrokerView adminView = broker.getAdminView();
+
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
sendMessages(SECONDARY_TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
- Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 20);
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
+
subscriber.closeSubscriber();
} catch (Exception e) {
Assert.fail("This should not have thrown an exception.", e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d2019b5..d8efc37 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -55,7 +55,6 @@ public final class GenericAlert {
@Dimension(value = "run-id") String runId,
@Dimension(value = "error-message") String message) {
return "IGNORE";
-
}
@Monitored(event = "wf-instance-failed")
@@ -71,8 +70,7 @@ public final class GenericAlert {
@Dimension(value = "start-time") String startTime,
@Dimension(value = "error-message") String errorMessage,
@Dimension(value = "message") String message,
- @TimeTaken long timeTaken)
- throws Exception {
+ @TimeTaken long timeTaken) {
return "IGNORE";
}
@@ -88,8 +86,7 @@ public final class GenericAlert {
@Dimension(value = "run-id") String runId,
@Dimension(value = "operation") String operation,
@Dimension(value = "start-time") String startTime,
- @TimeTaken long timeTaken)
- throws Exception {
+ @TimeTaken long timeTaken) {
return "IGNORE";
}
@@ -100,7 +97,6 @@ public final class GenericAlert {
@Dimension(value = "message") String message,
@Dimension(value = "exception") Exception exception) {
return "IGNORE";
-
}
@Monitored(event = "log-cleanup-service-failed")
@@ -112,7 +108,7 @@ public final class GenericAlert {
@Monitored(event = "jms-message-consumer-failed")
public static String alertJMSMessageConsumerFailed(
- @Dimension(value = "error-message") String errorMessage,
+ @Dimension(value = "message") String message,
@Dimension(value = "exception") Throwable throwable) {
return "IGNORE";
}