You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/07/08 09:04:51 UTC

falcon git commit: FALCON-1031 Make post processing notifications to user topics optional. Contributed by Pallavi Rao

Repository: falcon
Updated Branches:
  refs/heads/master 618f717d3 -> e5698fad3


FALCON-1031 Make post processing notifications to user topics optional. Contributed by Pallavi Rao


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e5698fad
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e5698fad
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e5698fad

Branch: refs/heads/master
Commit: e5698fad33c3004fed7ab4d02e82b2a993d525e6
Parents: 618f717
Author: Ajay Yadava <aj...@gmail.com>
Authored: Wed Jul 8 10:38:38 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Jul 8 10:38:38 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/ClusterHelper.java |  4 +-
 .../entity/parser/ClusterEntityParser.java      | 12 ++++-
 .../falcon/workflow/WorkflowExecutionArgs.java  |  1 +
 .../entity/parser/ClusterEntityParserTest.java  | 13 ++++++
 .../config/cluster/cluster-no-messaging.xml     | 38 +++++++++++++++
 docs/src/site/twiki/EntitySpecification.twiki   |  4 +-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |  2 +
 .../falcon/workflow/FalconPostProcessing.java   | 13 ++++--
 .../src/main/resources/action/post-process.xml  |  2 +
 .../workflow/FalconPostProcessingTest.java      | 49 ++++++++++++++++++--
 11 files changed, 130 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d2d589e..132e064 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-1031 Make post processing notifications to user topics optional (Pallavi Rao via Ajay Yadava)
+    
     FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu)
 
     FALCON-1293 Update CHANGES.txt to change 0.6.1 branch to release (Shaik Idris Ali via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 49d408f..87b0fba 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -41,6 +41,7 @@ import java.util.Map;
 public final class ClusterHelper {
     public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     public static final String WORKINGDIR = "working";
+    public static final String NO_USER_BROKER_URL = "NA";
 
     private ClusterHelper() {
     }
@@ -90,7 +91,8 @@ public final class ClusterHelper {
     }
 
     public static String getMessageBrokerUrl(Cluster cluster) {
-        return getInterface(cluster, Interfacetype.MESSAGING).getEndpoint();
+        final Interface messageInterface = getInterface(cluster, Interfacetype.MESSAGING);
+        return messageInterface == null ? NO_USER_BROKER_URL : messageInterface.getEndpoint();
     }
 
     public static String getMessageBrokerImplClass(Cluster cluster) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 5c3ce4f..59b0910 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -64,7 +64,10 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateScheme(cluster, Interfacetype.READONLY);
         validateScheme(cluster, Interfacetype.WRITE);
         validateScheme(cluster, Interfacetype.WORKFLOW);
-        validateScheme(cluster, Interfacetype.MESSAGING);
+        // User may choose to disable job completion notifications
+        if (ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING) != null) {
+            validateScheme(cluster, Interfacetype.MESSAGING);
+        }
         if (CatalogServiceFactory.isEnabled()
                 && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) {
             validateScheme(cluster, Interfacetype.REGISTRY);
@@ -154,6 +157,13 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     }
 
     protected void validateMessagingInterface(Cluster cluster) throws ValidationException {
+        // Validate only if user has specified this
+        final Interface messagingInterface = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING);
+        if (messagingInterface == null) {
+            LOG.info("Messaging service is not enabled for cluster: {}", cluster.getName());
+            return;
+        }
+
         final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
         final String implementation = StartupProperties.get().getProperty("broker.impl.class",
                 "org.apache.activemq.ActiveMQConnectionFactory");

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 0a8be64..9456fb9 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -73,6 +73,7 @@ public enum WorkflowExecutionArgs {
     USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class", false),
     USER_BRKR_URL("userBrokerUrl", "user broker url", false),
     BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
+    USER_JMS_NOTIFICATION_ENABLED("userJMSNotificationEnabled", "Is User notification via JMS enabled?", false),
 
     // state maintained
     LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index b7886bd..638cef9 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -124,6 +124,19 @@ public class ClusterEntityParserTest extends AbstractTestBase {
     }
 
     @Test
+    public void testParseClusterWithoutMessaging() throws FalconException {
+        InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-messaging.xml");
+
+        // Parse should be successful
+        Cluster cluster = parser.parse(stream);
+
+        Interface messaging = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING);
+        Assert.assertNull(messaging);
+
+        Assert.assertEquals(ClusterHelper.getMessageBrokerUrl(cluster), ClusterHelper.NO_USER_BROKER_URL);
+    }
+
+    @Test
     public void testParseClusterWithBadRegistry() throws Exception {
         // disable catalog service
         StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE);

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/common/src/test/resources/config/cluster/cluster-no-messaging.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/cluster/cluster-no-messaging.xml b/common/src/test/resources/config/cluster/cluster-no-messaging.xml
new file mode 100644
index 0000000..93e94cb
--- /dev/null
+++ b/common/src/test/resources/config/cluster/cluster-no-messaging.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="jail://testCluster:00"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="4.0"/>
+        <interface type="registry" endpoint="http://localhost:48080/templeton/v1"
+                   version="0.11.0"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+</cluster>

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 1ed2cb5..0c1fae2 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -657,10 +657,12 @@ Syntax:
 </process>
 </verbatim>
 
-queueName and jobPriority are special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job.
+The following are some special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job.
 <verbatim>
         <property name="queueName" value="hadoopQueue"/>
         <property name="jobPriority" value="VERY_HIGH"/>
+        <!-- This property is used to turn off JMS notifications for this process. JMS notifications are enabled by default. -->
+        <property name="userJMSNotificationEnabled" value="false"/>
 </verbatim>
 
 ---+++ Workflow

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index e5d75fb..92697b0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -57,6 +57,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     protected static final String MR_JOB_PRIORITY = "jobPriority";
 
     protected static final String IGNORE = "IGNORE";
+    private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
     protected final LifeCycle lifecycle;
 
     public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) {
@@ -132,6 +133,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
 
         props.put(MR_QUEUE_NAME, "default");
         props.put(MR_JOB_PRIORITY, "NORMAL");
+        props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
 
         //props in entity override the set props.
         props.putAll(getEntityProperties(entity));

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index e5b3704..7557153 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.workflow;
 
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.logging.JobLogMover;
 import org.apache.falcon.messaging.JMSMessageProducer;
 import org.apache.hadoop.conf.Configuration;
@@ -39,15 +40,21 @@ public class FalconPostProcessing extends Configured implements Tool {
 
     @Override
     public int run(String[] args) throws Exception {
-
         WorkflowExecutionContext context = WorkflowExecutionContext.create(args,
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         LOG.info("Post workflow execution context created {}", context);
         // serialize the context to HDFS under logs dir before sending the message
         context.serialize();
 
-        LOG.info("Sending user message {} ", context);
-        invokeUserMessageProducer(context);
+        String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
+        boolean userNotificationEnabled = Boolean.parseBoolean(context.
+                getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
+
+        if (userBrokerUrl != null && !userBrokerUrl.equals(ClusterHelper.NO_USER_BROKER_URL)
+                && userNotificationEnabled) {
+            LOG.info("Sending user message {} ", context);
+            invokeUserMessageProducer(context);
+        }
 
         // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
         LOG.info("Moving logs {}", context);

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index f354351..df0d286 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -60,6 +60,8 @@
         <arg>${userBrokerImplClass}</arg>
         <arg>-userBrokerUrl</arg>
         <arg>${userBrokerUrl}</arg>
+        <arg>-userJMSNotificationEnabled</arg>
+        <arg>${userJMSNotificationEnabled}</arg>
         <arg>-brokerTTL</arg>
         <arg>${brokerTTL}</arg>
         <arg>-feedNames</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/e5698fad/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index d59c5e6..0a9aaa0 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.oozie.workflow;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.workflow.FalconPostProcessing;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.commons.lang3.StringUtils;
@@ -45,12 +46,14 @@ public class FalconPostProcessingTest {
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String ENTITY_NAME = "agg-coord";
+    private String userBrokerUrl = BROKER_URL;
     private BrokerService broker;
 
     private volatile AssertionError error;
     private CountDownLatch latch = new CountDownLatch(1);
     private String[] outputFeedNames = {"out-click-logs", "out-raw-logs"};
     private String[] outputFeedPaths = {"/out-click-logs/10/05/05/00/20", "/out-raw-logs/10/05/05/00/20"};
+    private String userNotification = "true";
 
     @BeforeClass
     public void setup() throws Exception {
@@ -65,7 +68,8 @@ public class FalconPostProcessingTest {
             "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
             "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
             "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
-            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), userBrokerUrl,
+            "-" + WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, userNotification,
             "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
             "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
             "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
@@ -105,7 +109,7 @@ public class FalconPostProcessingTest {
             public void run() {
                 try {
                     // falcon message [FALCON_TOPIC_NAME] and user message ["FALCON." + ENTITY_NAME]
-                    consumer(BROKER_URL, "FALCON.>");
+                    consumer(BROKER_URL, "FALCON.>", true);
                 } catch (AssertionError e) {
                     error = e;
                 } catch (JMSException ignore) {
@@ -115,15 +119,50 @@ public class FalconPostProcessingTest {
         };
         t.start();
 
+        userBrokerUrl = BROKER_URL;
+
+        latch.await();
+        new FalconPostProcessing().run(args);
+        t.join();
+        if (error != null) {
+            throw error;
+        }
+    }
+
+    @Test
+    public void testNoUserMessage() throws Exception {
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    // falcon message [FALCON_TOPIC_NAME] and user message ["FALCON." + ENTITY_NAME]
+                    consumer(BROKER_URL, "FALCON.>", false);
+                } catch (AssertionError e) {
+                    error = e;
+                } catch (JMSException ignore) {
+                    error = null;
+                }
+            }
+        };
+        t.start();
+
+        userNotification = "false";
         latch.await();
         new FalconPostProcessing().run(this.args);
         t.join();
+
+        userNotification = "true";
+        userBrokerUrl = ClusterHelper.NO_USER_BROKER_URL;
+        latch.await();
+        new FalconPostProcessing().run(this.args);
+        t.join();
+
         if (error != null) {
             throw error;
         }
     }
 
-    private void consumer(String brokerUrl, String topic) throws JMSException {
+    private void consumer(String brokerUrl, String topic, boolean checkUserMessage) throws JMSException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -135,7 +174,9 @@ public class FalconPostProcessingTest {
         latch.countDown();
 
         // Verify user message
-        verifyMesssage(consumer);
+        if (checkUserMessage) {
+            verifyMesssage(consumer);
+        }
 
         // Verify falcon message
         verifyMesssage(consumer);