You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/04 23:41:19 UTC

[1/2] git commit: FALCON-644 Falcon message producer masks errors in Post processing. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 6f770a34d -> 9aad374ab


FALCON-644 Falcon message producer masks errors in Post processing. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/331ad145
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/331ad145
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/331ad145

Branch: refs/heads/master
Commit: 331ad145e172c4a862bb294578dfb333b39d28d6
Parents: 6f770a3
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Sep 4 14:40:03 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Sep 4 14:40:03 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 ++-
 .../falcon/messaging/JMSMessageConsumer.java    | 36 ++++++----------
 .../falcon/messaging/JMSMessageProducer.java    | 12 ++----
 .../messaging/JMSMessageProducerTest.java       | 17 ++++++++
 .../org/apache/falcon/aspect/GenericAlert.java  |  7 ++++
 .../falcon/workflow/FalconPostProcessing.java   |  7 +++-
 .../workflow/FalconPostProcessingTest.java      | 44 ++++++++++++++++++++
 7 files changed, 95 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3e9dcf6..1abbb67 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,7 +77,10 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
-   FALCON-338 - late data recording is enabled by default for all feeds 
+   FALCON-644 Falcon message producer masks errors in Post processing
+   (Venkatesh Seetharam)
+
+   FALCON-338 - late data recording is enabled by default for all feeds
    irrespective of late arrival config (Ajay Yadav via Suhas Vasu)
 
    FALCON-652 EntityUtils tests are failing (Ajay Yadav via Venkatesh Seetharam)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 573f5bd..ec7bd93 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -20,10 +20,12 @@ package org.apache.falcon.messaging;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -44,19 +46,18 @@ import java.util.Map;
  * Subscribes to the falcon topic for handling retries and alerts.
  */
 public class JMSMessageConsumer implements MessageListener, ExceptionListener {
-    private static final Logger LOG = Logger.getLogger(JMSMessageConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
 
     private final String implementation;
     private final String userName;
     private final String password;
     private final String url;
     private final String topicName;
+    private final WorkflowJobEndNotificationService jobEndNotificationService;
 
     private Connection connection;
     private TopicSubscriber subscriber;
 
-    private final WorkflowJobEndNotificationService jobEndNotificationService;
-
     public JMSMessageConsumer(String implementation, String userName,
                               String password, String url, String topicName,
                               WorkflowJobEndNotificationService jobEndNotificationService) {
@@ -87,22 +88,20 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     @Override
     public void onMessage(Message message) {
         MapMessage mapMessage = (MapMessage) message;
+        LOG.info("Received message {}", message.toString());
 
         try {
-            if (LOG.isDebugEnabled()) {debug(mapMessage); }
-
             WorkflowExecutionContext context = createContext(mapMessage);
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {
                 onSuccess(context);
             }
-        } catch (JMSException e) {
-            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
-        } catch (FalconException e) {
-            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
         } catch (Exception e) {
-            LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
+            String errorMessage = "Error in onMessage for subscriber of topic: "
+                    + topicName + ", Message: " + message.toString();
+            LOG.info(errorMessage, e);
+            GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
         }
     }
 
@@ -127,20 +126,11 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         jobEndNotificationService.notifySuccess(context);
     }
 
-    private void debug(MapMessage mapMessage) throws JMSException {
-        StringBuilder buff = new StringBuilder();
-        buff.append("Received:{");
-        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
-            buff.append(arg.getName()).append('=')
-                .append(mapMessage.getString(arg.getName())).append(", ");
-        }
-        buff.append("}");
-        LOG.debug(buff);
-    }
-
     @Override
     public void onException(JMSException ignore) {
-        LOG.info("Error in onException for subscriber of topic: " + this.toString(), ignore);
+        String errorMessage = "Error in onException for subscriber of topic: " + topicName;
+        LOG.info(errorMessage, ignore);
+        GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
     }
 
     public void closeSubscriber() throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index fc31bab..0181e74 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -143,9 +143,9 @@ public class JMSMessageProducer {
      * Sends all arguments.
      *
      * @return error code
-     * @throws JMSException
+     * @throws Exception
      */
-    public int sendMessage() throws JMSException {
+    public int sendMessage() throws Exception {
         return sendMessage(WorkflowExecutionArgs.values());
     }
 
@@ -156,9 +156,9 @@ public class JMSMessageProducer {
      *
      * @param filteredArgs args sent in the message.
      * @return error code
-     * @throws JMSException
+     * @throws Exception
      */
-    public int sendMessage(WorkflowExecutionArgs[] filteredArgs) throws JMSException {
+    public int sendMessage(WorkflowExecutionArgs[] filteredArgs) throws Exception {
         List<Map<String, String>> messageList = buildMessageList(filteredArgs);
 
         if (messageList.isEmpty()) {
@@ -174,10 +174,6 @@ public class JMSMessageProducer {
                 LOG.info("Sending message: {}", message);
                 sendMessage(connection, message);
             }
-        } catch (JMSException e) {
-            LOG.error("Error in getConnection:", e);
-        } catch (Exception e) {
-            LOG.error("Error in getConnection:", e);
         } finally {
             closeQuietly(connection);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index e4ea22f..bf8615f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -215,4 +215,21 @@ public class JMSMessageProducerTest {
                 "2012-01-01-01-00");
         Assert.assertEquals(message.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
+
+    @Test (expectedExceptions = JMSException.class)
+    public void testFailuresInSendMessagesAreNotMasked() throws Exception {
+        List<String> args = new ArrayList<String>(Arrays.asList(
+                "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "raw-logs",
+                "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+                "/raw-logs/10/05/05/00/20",
+                "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "error"));
+        args.addAll(createCommonArgs());
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                args.toArray(new String[args.size()]), WorkflowExecutionContext.Type.POST_PROCESSING);
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.FALCON).build();
+        jmsMessageProducer.sendMessage();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index 5ab2f72..d2019b5 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -26,6 +26,7 @@ import org.aspectj.lang.annotation.Aspect;
  * Create a method with params you want to monitor via Aspect and log in metric
  * and iMon, invoke this method from code.
  */
+@SuppressWarnings("UnusedParameters")
 @Aspect
 public final class GenericAlert {
 
@@ -107,6 +108,12 @@ public final class GenericAlert {
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {
         return "IGNORE";
+    }
 
+    @Monitored(event = "jms-message-consumer-failed")
+    public static String alertJMSMessageConsumerFailed(
+            @Dimension(value = "error-message") String errorMessage,
+            @Dimension(value = "exception") Throwable throwable) {
+        return "IGNORE";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 0adc11b..e5b3704 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -80,6 +80,11 @@ public class FalconPostProcessing extends Configured implements Tool {
             return;
         }
 
-        new JobLogMover().run(context);
+        try {
+            new JobLogMover().run(context);
+        } catch (Exception ignored) {
+            // Mask exception, a failed log mover will not fail the user workflow
+            LOG.error("Exception in job log mover:", ignored);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/331ad145/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 201b682..87d5e8a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -154,4 +154,48 @@ public class FalconPostProcessingTest {
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01-01-00");
         Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
+
+    @Test (expectedExceptions = JMSException.class)
+    public void testFailuresInSendMessagesAreNotMasked() throws Exception {
+        try {
+            broker.stop();
+        } catch (Exception ignored) {
+            // ignore
+        } finally {
+            new FalconPostProcessing().run(getMessageArgs());
+        }
+    }
+
+    private String[] getMessageArgs() {
+        return new String[]{
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+            "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "error",
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "error",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), "/logFile",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie/",
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "target/log",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id" + "test",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), "oozie",
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "in-click-logs,in-raw-logs",
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
+            "/in-click-logs/10/05/05/00/20,/in-raw-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), "test-workflow",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), "1.0.0",
+        };
+    }
 }


[2/2] git commit: FALCON-669 Missing optional workflow execution listeners configuration results in NPE. Contributed by Raghav Kumar Gautam

Posted by ve...@apache.org.
FALCON-669 Missing optional workflow execution listeners configuration results in NPE. Contributed by Raghav Kumar Gautam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9aad374a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9aad374a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9aad374a

Branch: refs/heads/master
Commit: 9aad374ab75ca477ada47c4bd2b0aaacd656602a
Parents: 331ad14
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Sep 4 14:41:06 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Sep 4 14:41:06 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                                     | 3 +++
 .../falcon/workflow/WorkflowJobEndNotificationService.java      | 5 +++++
 2 files changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9aad374a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1abbb67..4a3bbc4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-669 Missing optional workflow execution listeners configuration
+   results in NPE (Raghav Kumar Gautam via Venkatesh Seetharam)
+
    FALCON-644 Falcon message producer masks errors in Post processing
    (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9aad374a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 67f6c79..fb2d58d 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.workflow;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -54,6 +55,10 @@ public class WorkflowJobEndNotificationService implements FalconService {
     public void init() throws FalconException {
         String listenerClassNames = StartupProperties.get().getProperty(
                 "workflow.execution.listeners");
+        if (StringUtils.isEmpty(listenerClassNames)) {
+            return;
+        }
+
         for (String listenerClassName : listenerClassNames.split(",")) {
             listenerClassName = listenerClassName.trim();
             if (listenerClassName.isEmpty()) {