You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/05/12 01:09:19 UTC

falcon git commit: FALCON-1926 Filter out effectively non-falcon related JMS messages …

Repository: falcon
Updated Branches:
  refs/heads/master 5e26e11ca -> 98a904c21


FALCON-1926 Filter out effectively non-falcon related JMS messages \u2026

* Falcon to retrieve Oozie JMS notifications that belong to Falcon generated Workflows .. ignoring notifications generated by other WF directly submitted to Oozie.

Author: Venkatesan Ramachandran <vr...@hortonworks.com>

Reviewers: "Balu Vellanki <ba...@apache.org>, Ying Zheng <yz...@hortonworks.com>"

Closes #119 from vramachan/FALCON-1923.OpsCoord


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/98a904c2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/98a904c2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/98a904c2

Branch: refs/heads/master
Commit: 98a904c2118d368e110b88ff396da86c64490ed8
Parents: 5e26e11
Author: Venkatesan Ramachandran <vr...@hortonworks.com>
Authored: Wed May 11 18:09:16 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Wed May 11 18:09:16 2016 -0700

----------------------------------------------------------------------
 .../falcon/entity/WorkflowNameBuilder.java      |  7 +++
 docs/src/site/twiki/Configuration.twiki         |  2 +-
 .../falcon/messaging/JMSMessageConsumer.java    |  3 +-
 .../messaging/JMSMessageConsumerTest.java       | 47 +++++++++++++++++---
 .../falcon/workflow/FalconPostProcessing.java   | 15 -------
 5 files changed, 51 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
index c58be64..f0d6073 100644
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
@@ -34,6 +34,9 @@ import java.util.regex.Pattern;
 public class WorkflowNameBuilder<T extends Entity> {
     private static final String PREFIX = "FALCON";
 
+    // Oozie JMS message property name that holds the workflow app name
+    private static final String OOZIE_JMS_MSG_APPNAME_PROP = "appName";
+
     private T entity;
     private Tag tag;
     private List<String> suffixes;
@@ -153,5 +156,9 @@ public class WorkflowNameBuilder<T extends Entity> {
             }
             return null;
         }
+
+        public static String getJMSFalconSelector() {
+            return String.format("%s like '%s%s%%'", OOZIE_JMS_MSG_APPNAME_PROP, PREFIX, SEPARATOR);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 0df094f..8cf2a64 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -99,7 +99,7 @@ Some Falcon features such as late data handling, retries, metadata service, depe
    * In Falcon runtime.properties, set *.falcon.jms.notification.enabled to false. This will turn off JMS notification in post-processing.
    * Copy notification related properties in oozie/conf/oozie-site.xml to oozie-site.xml of the Oozie installation.  Restart Oozie so changes get reflected.  
 
-*NOTE : If you disable Falcon post-processing JMS notification and not enable Oozie JMS notification, features such as failure retry, late data handling and metadata service will be disabled for all entities on the server.*
+*NOTE : Oozie JMS notification needs to be enabled for features such as failure retry, late data handling and metadata service will be disabled for all entities on the server. Please refer Falcon documentation on how to configure Oozie for Falcon.*
 
 ---+++Enabling Falcon Native Scheudler
 You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to schedule entities natively on Falcon, you will need to add some additional properties to <verbatim>$FALCON_HOME/conf/startup.properties</verbatim> before starting the Falcon Server. For details on the same, refer to [[FalconNativeScheduler][Falcon Native Scheduler]]

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 90bbdd3..8b48e93 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -92,7 +92,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
 
             topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Topic destination = topicSession.createTopic(topicName);
-            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
+                    WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false);
             topicSubscriber.setMessageListener(this);
 
             connection.setExceptionListener(this);

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 5c53a3e..0ba9464 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -83,6 +83,11 @@ public class JMSMessageConsumerTest {
 
     public void sendMessages(String topic, WorkflowExecutionContext.Type type)
         throws JMSException, FalconException, IOException {
+        sendMessages(topic, type, true);
+    }
+
+    public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF)
+        throws JMSException, FalconException, IOException {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
@@ -100,10 +105,10 @@ public class JMSMessageConsumerTest {
                 message = getMockFalconMessage(i, session);
                 break;
             case WORKFLOW_JOB:
-                message = getMockOozieMessage(i, session);
+                message = getMockOozieMessage(i, session, isFalconWF);
                 break;
             case COORDINATOR_ACTION:
-                message = getMockOozieCoordMessage(i, session);
+                message = getMockOozieCoordMessage(i, session, isFalconWF);
             default:
                 break;
             }
@@ -112,10 +117,15 @@ public class JMSMessageConsumerTest {
         }
     }
 
-    private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
+    private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
+        throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "WORKFLOW_JOB");
-        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        if (isFalconWF) {
+            message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        } else {
+            message.setStringProperty("appName", "OozieSampleShellWF");
+        }
         message.setStringProperty("user", "falcon");
         switch(i % 4) {
         case 0:
@@ -142,10 +152,15 @@ public class JMSMessageConsumerTest {
         return message;
     }
 
-    private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException {
+    private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF)
+        throws FalconException, JMSException {
         TextMessage message = session.createTextMessage();
         message.setStringProperty("appType", "COORDINATOR_ACTION");
-        message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        if (isFalconWF) {
+            message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
+        } else {
+            message.setStringProperty("appName", "OozieSampleShellWF");
+        }
         message.setStringProperty("user", "falcon");
         switch(i % 5) {
         case 0:
@@ -288,4 +303,24 @@ public class JMSMessageConsumerTest {
         broker.stop();
         subscriber.closeSubscriber();
     }
+
+    @Test
+    public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
+        sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */);
+
+        final BrokerView adminView = broker.getAdminView();
+
+        Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+        Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
+        Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
+        Assert.assertEquals(adminView.getTotalMessageCount(), 0);
+
+        Thread.sleep(100);
+        Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
+        Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/98a904c2/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 3bdfe73..4961896 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,14 +48,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         // serialize the context to HDFS under logs dir before sending the message
         context.serialize();
 
-        boolean systemNotificationEnabled = Boolean.parseBoolean(context.
-                getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
-
-        if (systemNotificationEnabled) {
-            LOG.info("Sending Falcon message {} ", context);
-            invokeFalconMessageProducer(context);
-        }
-
         String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
         boolean userNotificationEnabled = Boolean.parseBoolean(context.
                 getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
@@ -80,13 +72,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
-    private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
-        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
-                .type(JMSMessageProducer.MessageType.FALCON)
-                .build();
-        jmsMessageProducer.sendMessage();
-    }
-
     private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {