You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/30 13:31:02 UTC

falcon git commit: FALCON-1564 Provide an option for users to disable system post-processing JMS notification

Repository: falcon
Updated Branches:
  refs/heads/0.8 94b755615 -> 243a7dcad


FALCON-1564 Provide an option for users to disable system post-processing JMS notification


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/243a7dca
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/243a7dca
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/243a7dca

Branch: refs/heads/0.8
Commit: 243a7dcad3e884a7502b48058bb84ebc001eed6d
Parents: 94b7556
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Oct 30 17:54:08 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Oct 30 17:54:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/workflow/WorkflowExecutionArgs.java  |  1 +
 .../WorkflowJobEndNotificationService.java      |  3 +-
 docs/src/site/twiki/Configuration.twiki         |  6 +++-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  7 +++-
 .../falcon/workflow/FalconPostProcessing.java   | 15 +++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  2 +-
 .../src/main/resources/action/post-process.xml  |  2 ++
 .../workflow/FalconPostProcessingTest.java      | 34 ++++++++++++++++++++
 src/conf/runtime.properties                     |  4 +++
 10 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d57a8ec..898f8e0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@ Release version: 0.8
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1564 Provide an option for users to disable system post-processing JMS notification (Pallavi Rao)
+
     FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal via Pallavi Rao)
 
     FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index ac7140c..4318620 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -77,6 +77,7 @@ public enum WorkflowExecutionArgs {
     USER_BRKR_URL("userBrokerUrl", "user broker url", false),
     BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
     USER_JMS_NOTIFICATION_ENABLED("userJMSNotificationEnabled", "Is User notification via JMS enabled?", false),
+    SYSTEM_JMS_NOTIFICATION_ENABLED("systemJMSNotificationEnabled", "Is system notification via JMS enabled?", false),
 
     // state maintained
     LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 5c75f5c..9d96fa3 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -214,7 +214,8 @@ public class WorkflowJobEndNotificationService implements FalconService {
                 engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
                         .isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
             } catch (FalconException e) {
-                LOG.debug("Unable to determine if the notification is enabled on the wf engine. Assuming not.", e);
+                LOG.debug("Received error while checking if notification is enabled. "
+                        + "Hence, assuming notification is not enabled.");
             }
             // Ignore the message from post processing as there will be one more from Oozie.
             if (engineNotifEnabled) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 37b5717..743ce40 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -86,7 +86,6 @@ and change it to look as below
 export FALCON_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="
 </verbatim>
 
-
 ---+++Activemq
 
 * falcon server starts embedded active mq. To control this behaviour, set the following system properties using -D
@@ -95,6 +94,11 @@ option in environment variable FALCON_OPTS:
    * falcon.embeddedmq.port=<port> - Port for embedded active mq, default 61616
    * falcon.embeddedmq.data=<path> - Data path for embedded active mq, default {package dir}/logs/data
 
+---+++Falcon System Notifications
+Some Falcon features such as late data handling, retries, metadata service, depend on JMS notifications sent when the Oozie workflow completes. These system notifications are sent as part of Falcon Post Processing action. Given that the post processing action is also a job, it is prone to failures and in case of failures, Falcon is blind to the status of the workflow. To alleviate this problem and make the notifications more reliable, you can enable Oozie's JMS notification feature and disable Falcon post-processing notification by making the following changes:
+   * In Falcon runtime.properties, set *.falcon.jms.notification.enabled to false. This will turn off JMS notification in post-processing.
+   * Copy notification related properties in oozie/conf/oozie-site.xml to oozie-site.xml of the Oozie installation.  Restart Oozie so changes get reflected.  
+
 ---+++Adding Extension Libraries
 
 Library extensions allows users to add custom libraries to entity lifecycles such as feed retention, feed replication

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index a04ae95..acb49ef 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -33,6 +33,7 @@ import org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder;
 import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
 import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
@@ -49,7 +50,9 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
     protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
 
-    private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
+    private static final String USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
+    private static final String SYSTEM_JMS_NOTIFICATION_ENABLED = "systemJMSNotificationEnabled";
+
     protected final LifeCycle lifecycle;
 
     public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) {
@@ -111,6 +114,8 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
                 "${coord:nominalTime()}").getId());
         props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
+        props.put(SYSTEM_JMS_NOTIFICATION_ENABLED,
+                RuntimeProperties.get().getProperty("falcon.jms.notification.enabled", "true"));
 
         return props;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..3bdfe73 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured implements Tool {
         // serialize the context to HDFS under logs dir before sending the message
         context.serialize();
 
+        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
+                getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
+
+        if (systemNotificationEnabled) {
+            LOG.info("Sending Falcon message {} ", context);
+            invokeFalconMessageProducer(context);
+        }
+
         String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
@@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured implements Tool {
         jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
+    private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.FALCON)
+                .build();
+        jmsMessageProducer.sendMessage();
+    }
+
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 09c29ab..7262964 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -604,7 +604,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 }
             }
         } catch (OozieClientException e) {
-            LOG.error("Error while retrieving JMS connection info", e);
+            LOG.debug("Error while retrieving JMS connection info", e);
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index df0d286..0f51df7 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -62,6 +62,8 @@
         <arg>${userBrokerUrl}</arg>
         <arg>-userJMSNotificationEnabled</arg>
         <arg>${userJMSNotificationEnabled}</arg>
+        <arg>-systemJMSNotificationEnabled</arg>
+        <arg>${systemJMSNotificationEnabled}</arg>
         <arg>-brokerTTL</arg>
         <arg>${brokerTTL}</arg>
         <arg>-feedNames</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 4b74368..171068a 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -55,6 +55,7 @@ public class FalconPostProcessingTest {
     private String[] outputFeedNames = {"out-click-logs", "out-raw-logs"};
     private String[] outputFeedPaths = {"/out-click-logs/10/05/05/00/20", "/out-raw-logs/10/05/05/00/20"};
     private String userNotification = "true";
+    private String systemNotification = "true";
 
     @BeforeClass
     public void setup() throws Exception {
@@ -71,6 +72,7 @@ public class FalconPostProcessingTest {
             "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
             "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), userBrokerUrl,
             "-" + WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, userNotification,
+            "-" + WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, systemNotification,
             "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
             "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
             "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
@@ -163,6 +165,38 @@ public class FalconPostProcessingTest {
         }
     }
 
+    @Test
+    public void testSystemMessage() throws Exception {
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    // falcon message [FALCON_TOPIC_NAME]
+                    consumer(BROKER_URL, "FALCON.>", false);
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+                    error = null;
+                }
+            }
+        };
+        t.start();
+
+        systemNotification = "false";
+        latch.await();
+        new FalconPostProcessing().run(this.args);
+        t.join();
+
+        systemNotification = "true";
+        latch.await();
+        new FalconPostProcessing().run(this.args);
+        t.join();
+
+        if (error != null) {
+            throw error;
+        }
+    }
+
     private void consumer(String brokerUrl, String topic, boolean checkUserMessage) throws JMSException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();

http://git-wip-us.apache.org/repos/asf/falcon/blob/243a7dca/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 1260f55..25333fe 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -44,6 +44,10 @@ falcon.current.colo=local
 # If true, Falcon skips oozie dryrun while scheduling entities.
 *.falcon.skip.dryrun=false
 
+# If set to false, the post processing action of Oozie workflows will not generate JMS notification for Falcon.
+# If you are setting this to false, ensure you have enable Oozie JMS notifications in oozie-site.xml
+*.falcon.jms.notification.enabled=true
+
 ######### Proxyuser Configuration Start #########
 
 #List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. The '#USER#' must be replaced with the