You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:56 UTC

[2/4] git commit: FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam

FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/caa22842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/caa22842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/caa22842

Branch: refs/heads/master
Commit: caa22842913419e1a3acab292ca7c5396d639d60
Parents: df6f1d8
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:35:22 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../workflow/WorkflowExecutionContext.java      |  5 ++
 .../WorkflowJobEndNotificationService.java      | 25 +++----
 .../WorkflowJobEndNotificationServiceTest.java  | 12 +---
 .../falcon/messaging/JMSMessageConsumer.java    | 75 ++++++++++++++------
 .../messaging/JMSMessageConsumerTest.java       | 16 ++++-
 .../org/apache/falcon/aspect/GenericAlert.java  | 10 +--
 7 files changed, 92 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f638a6..824bddd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-665 Handle message consumption failures in JMSMessageConsumer
+   (Venkatesh Seetharam)
+
    FALCON-662 Fetch relationships for a given type API (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9c7b395..04ef037 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -285,6 +285,11 @@ public class WorkflowExecutionContext {
         }
     }
 
+    @Override
+    public String toString() {
+        return "WorkflowExecutionContext{" + context.toString() + "}";
+    }
+
     @SuppressWarnings("unchecked")
     public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fb2d58d..90b66c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -82,7 +82,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         listeners.remove(listener);
     }
 
-    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
+    public void notifyFailure(WorkflowExecutionContext context) {
         for (WorkflowExecutionListener listener : listeners) {
             try {
                 listener.onFailure(context);
@@ -95,7 +95,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         instrumentAlert(context);
     }
 
-    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
+    public void notifySuccess(WorkflowExecutionContext context) {
         for (WorkflowExecutionListener listener : listeners) {
             try {
                 listener.onSuccess(context);
@@ -108,7 +108,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         instrumentAlert(context);
     }
 
-    private void instrumentAlert(WorkflowExecutionContext context) throws FalconException {
+    private void instrumentAlert(WorkflowExecutionContext context) {
         String clusterName = context.getClusterName();
         String entityName = context.getEntityName();
         String entityType = context.getEntityType();
@@ -118,14 +118,14 @@ public class WorkflowJobEndNotificationService implements FalconService {
         String nominalTime = context.getNominalTimeAsISO8601();
         String runId = String.valueOf(context.getWorkflowRunId());
 
-        CurrentUser.authenticate(context.getWorkflowUser());
-        AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-        InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
-        Date startTime = result.getInstances()[0].startTime;
-        Date endTime = result.getInstances()[0].endTime;
-        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
         try {
+            CurrentUser.authenticate(context.getWorkflowUser());
+            AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+            InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
+            Date startTime = result.getInstances()[0].startTime;
+            Date endTime = result.getInstances()[0].endTime;
+            Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
             if (context.hasWorkflowFailed()) {
                 GenericAlert.instrumentFailedInstance(clusterName, entityType,
                         entityName, nominalTime, workflowId, workflowUser, runId, operation,
@@ -135,8 +135,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
                         entityName, nominalTime, workflowId, workflowUser, runId, operation,
                         SchemaHelper.formatDateUTC(startTime), duration);
             }
-        } catch (Exception e) {
-            throw new FalconException(e);
+        } catch (FalconException e) {
+            // Logging an error and ignoring since there are listeners for extensions
+            LOG.error("Instrumenting alert failed for: " + context, e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 9a6ad98..b7df443 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -109,19 +109,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
     }
 
     private void notifyFailure(WorkflowExecutionContext context) {
-        try {
-            service.notifyFailure(context);
-        } catch (FalconException ignored) {
-            // do nothing
-        }
+        service.notifyFailure(context);
     }
 
     private void notifySuccess(WorkflowExecutionContext context) {
-        try {
-            service.notifySuccess(context);
-        } catch (FalconException ignored) {
-            // do nothing
-        }
+        service.notifySuccess(context);
     }
 
     private static String[] getTestMessageArgs() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ec7bd93..4a0bc2a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -48,6 +48,8 @@ import java.util.Map;
 public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
 
+    private static final String FALCON_CLIENT_ID = "falcon-server";
+
     private final String implementation;
     private final String userName;
     private final String password;
@@ -56,7 +58,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     private final WorkflowJobEndNotificationService jobEndNotificationService;
 
     private Connection connection;
-    private TopicSubscriber subscriber;
+    private TopicSession topicSession;
+    private TopicSubscriber topicSubscriber;
 
     public JMSMessageConsumer(String implementation, String userName,
                               String password, String url, String topicName,
@@ -72,15 +75,17 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     public void startSubscriber() throws FalconException {
         try {
             connection = createAndGetConnection(implementation, userName, password, url);
-            TopicSession session = (TopicSession) connection.createSession(
-                    false, Session.AUTO_ACKNOWLEDGE);
-            Topic destination = session.createTopic(topicName);
-            subscriber = session.createSubscriber(destination);
-            subscriber.setMessageListener(this);
+            connection.setClientID(FALCON_CLIENT_ID);
+
+            topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic destination = topicSession.createTopic(topicName);
+            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+            topicSubscriber.setMessageListener(this);
+
             connection.setExceptionListener(this);
             connection.start();
         } catch (Exception e) {
-            LOG.error("Error starting subscriber of topic: " + this.toString(), e);
+            LOG.error("Error starting topicSubscriber of topic: " + this.toString(), e);
             throw new FalconException(e);
         }
     }
@@ -88,17 +93,19 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     @Override
     public void onMessage(Message message) {
         MapMessage mapMessage = (MapMessage) message;
-        LOG.info("Received message {}", message.toString());
+        LOG.info("Received JMS message {}", message.toString());
 
         try {
             WorkflowExecutionContext context = createContext(mapMessage);
+            LOG.info("Created context from JMS message {}", context);
+
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {
                 onSuccess(context);
             }
-        } catch (Exception e) {
-            String errorMessage = "Error in onMessage for subscriber of topic: "
+        } catch (JMSException e) {
+            String errorMessage = "Error in onMessage for topicSubscriber of topic: "
                     + topicName + ", Message: " + message.toString();
             LOG.info(errorMessage, e);
             GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
@@ -118,33 +125,57 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         return WorkflowExecutionContext.create(wfProperties);
     }
 
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+    public void onFailure(WorkflowExecutionContext context) {
         jobEndNotificationService.notifyFailure(context);
     }
 
-    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+    public void onSuccess(WorkflowExecutionContext context) {
         jobEndNotificationService.notifySuccess(context);
     }
 
     @Override
     public void onException(JMSException ignore) {
-        String errorMessage = "Error in onException for subscriber of topic: " + topicName;
+        String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
         LOG.info(errorMessage, ignore);
         GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
     }
 
-    public void closeSubscriber() throws FalconException {
-        try {
-            LOG.info("Closing subscriber on topic : " + this.topicName);
-            if (subscriber != null) {
-                subscriber.close();
+    public void closeSubscriber() {
+        LOG.info("Closing topicSubscriber on topic : " + this.topicName);
+        // closing each quietly so client id can be unsubscribed
+        closeTopicSubscriberQuietly();
+        closeTopicSessionQuietly();
+        closeConnectionQuietly();
+    }
+
+    private void closeTopicSubscriberQuietly() {
+        if (topicSubscriber != null) {
+            try {
+                topicSubscriber.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
             }
-            if (connection != null) {
+        }
+    }
+
+    private void closeTopicSessionQuietly() {
+        if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
+            try {
+                topicSession.unsubscribe(FALCON_CLIENT_ID);
+                topicSession.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic session: " + topicSession, ignore);
+            }
+        }
+    }
+
+    private void closeConnectionQuietly() {
+        if (connection != null) {
+            try {
                 connection.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS connection: " + connection, ignore);
             }
-        } catch (JMSException e) {
-            LOG.error("Error closing subscriber of topic: " + this.toString(), e);
-            throw new FalconException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 9a4a6f7..974116d 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.messaging;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -59,6 +60,7 @@ public class JMSMessageConsumerTest {
         broker.setDataDirectory("target/activemq");
         broker.setBrokerName("localhost");
         broker.start();
+        broker.deleteAllMessages();
     }
 
     public void sendMessages(String topic) throws JMSException, FalconException, IOException {
@@ -137,11 +139,19 @@ public class JMSMessageConsumerTest {
                     BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
             subscriber.startSubscriber();
             sendMessages(TOPIC_NAME);
-            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+
+            final BrokerView adminView = broker.getAdminView();
+
+            Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+            Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+            Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
 
             sendMessages(SECONDARY_TOPIC_NAME);
-            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
-            Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+
+            Assert.assertEquals(adminView.getTotalEnqueueCount(), 20);
+            Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+            Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
+
             subscriber.closeSubscriber();
         } catch (Exception e) {
             Assert.fail("This should not have thrown an exception.", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d2019b5..d8efc37 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -55,7 +55,6 @@ public final class GenericAlert {
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "error-message") String message) {
         return "IGNORE";
-
     }
 
     @Monitored(event = "wf-instance-failed")
@@ -71,8 +70,7 @@ public final class GenericAlert {
             @Dimension(value = "start-time") String startTime,
             @Dimension(value = "error-message") String errorMessage,
             @Dimension(value = "message") String message,
-            @TimeTaken long timeTaken)
-        throws Exception {
+            @TimeTaken long timeTaken) {
 
         return "IGNORE";
     }
@@ -88,8 +86,7 @@ public final class GenericAlert {
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,
-            @TimeTaken long timeTaken)
-        throws Exception {
+            @TimeTaken long timeTaken) {
 
         return "IGNORE";
     }
@@ -100,7 +97,6 @@ public final class GenericAlert {
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Exception exception) {
         return "IGNORE";
-
     }
 
     @Monitored(event = "log-cleanup-service-failed")
@@ -112,7 +108,7 @@ public final class GenericAlert {
 
     @Monitored(event = "jms-message-consumer-failed")
     public static String alertJMSMessageConsumerFailed(
-            @Dimension(value = "error-message") String errorMessage,
+            @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {
         return "IGNORE";
     }