You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/03/31 21:51:31 UTC

git commit: OOZIE-1765 JMS Notifications for Workflows not always on the correct topic (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 81f6c747a -> 0b4d3521a


OOZIE-1765 JMS Notifications for Workflows not always on the correct topic (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0b4d3521
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0b4d3521
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0b4d3521

Branch: refs/heads/master
Commit: 0b4d3521a92ca8f7d4f7e6f91e6470d2a09118bc
Parents: 81f6c74
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Mar 31 12:49:28 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Mar 31 12:49:28 2014 -0700

----------------------------------------------------------------------
 .../apache/oozie/service/JMSTopicService.java   | 11 ++++-
 .../oozie/service/TestJMSTopicService.java      | 46 ++++++++++++++++++--
 docs/src/site/twiki/AG_Install.twiki            |  4 +-
 docs/src/site/twiki/DG_JMSNotifications.twiki   | 21 +++++----
 release-log.txt                                 |  1 +
 5 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JMSTopicService.java b/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
index 7a7a960..1646a17 100644
--- a/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
+++ b/core/src/main/java/org/apache/oozie/service/JMSTopicService.java
@@ -193,7 +193,16 @@ public class JMSTopicService implements Service {
             }
         }
         else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
-            topicName = topicMap.get(JobType.WORKFLOW);
+            topicName = topicMap.get(JobType.WORKFLOW.value);
+            if (appType == AppType.WORKFLOW_ACTION) {
+                id = parentJobId;
+            }
+        }
+        else if (appType == AppType.BUNDLE_JOB || appType == AppType.BUNDLE_ACTION) {
+            topicName = topicMap.get(JobType.BUNDLE.value);
+            if (appType == AppType.BUNDLE_ACTION) {
+                id = parentJobId;
+            }
         }
 
         if (topicName == null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java b/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
index 5f70153..21ca849 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSTopicService.java
@@ -67,17 +67,25 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         assertEquals(wfj.getUser(), jmsTopicService.getTopic(wfj.getId()));
+        assertEquals(wfj.getUser(), jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
         assertEquals(wfj.getUser(), jmsTopicService.getTopic(wab.getId()));
+        assertEquals(wfj.getUser(), jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(cjb.getUser(), jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(cjb.getUser(), jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(cab.getId()));
+        assertEquals(cjb.getUser(), jmsTopicService.getTopic(cab.getId()));
+        assertEquals(cjb.getUser(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
-        assertEquals(wfj.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -91,17 +99,27 @@ public class TestJMSTopicService extends XDataTestCase {
         WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         assertEquals(TOPIC_PREFIX, jmsTopicService.getTopicPrefix());
         assertEquals(TOPIC_PREFIX + wfj.getId(), jmsTopicService.getTopic(wfj.getId()));
+        assertEquals(TOPIC_PREFIX + wfj.getId(), jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
         assertEquals(TOPIC_PREFIX + wfj.getId(), jmsTopicService.getTopic(wab.getId()));
+        assertEquals(TOPIC_PREFIX + wfj.getId(),
+                jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals(TOPIC_PREFIX + cjb.getId(), jmsTopicService.getTopic(cjb.getId()));
+        assertEquals(TOPIC_PREFIX + cjb.getId(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals(TOPIC_PREFIX + cjb.getId(), jmsTopicService.getTopic(cab.getId()));
+        assertEquals(TOPIC_PREFIX + cjb.getId(),
+                jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
         assertEquals(TOPIC_PREFIX + bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(TOPIC_PREFIX + bjb.getId(), jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
         assertEquals(TOPIC_PREFIX + bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(TOPIC_PREFIX + bjb.getId(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -116,17 +134,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+        assertEquals("coord", jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
         assertEquals("bundle", jmsTopicService.getTopic(bjb.getId()));
+        assertEquals("bundle", jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
         assertEquals("bundle", jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals("bundle",
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -141,17 +166,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
+        assertEquals("coord", jmsTopicService.getTopic(AppType.COORDINATOR_JOB, cjb.getUser(), cjb.getId(), null));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
         assertEquals(bjb.getId(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getId(), jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
         assertEquals(bjb.getId(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getId(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test
@@ -165,18 +197,24 @@ public class TestJMSTopicService extends XDataTestCase {
         JMSTopicService jmsTopicService = Services.get().get(JMSTopicService.class);
         WorkflowJobBean wfj = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
         assertEquals("workflow", jmsTopicService.getTopic(wfj.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_JOB, wfj.getUser(), wfj.getId(), null));
         WorkflowActionBean wab = addRecordToWfActionTable(wfj.getId(), "1", WorkflowAction.Status.RUNNING);
         assertEquals("workflow", jmsTopicService.getTopic(wab.getId()));
+        assertEquals("workflow", jmsTopicService.getTopic(AppType.WORKFLOW_ACTION, wfj.getUser(), wab.getId(), wab.getWfId()));
         CoordinatorJobBean cjb = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
         assertEquals("coord", jmsTopicService.getTopic(cjb.getId()));
         CoordinatorActionBean cab = addRecordToCoordActionTable(cjb.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
                 "coord-action-for-action-input-check.xml", 0);
         assertEquals("coord", jmsTopicService.getTopic(cab.getId()));
+        assertEquals("coord", jmsTopicService.getTopic(AppType.COORDINATOR_ACTION, cjb.getUser(), cab.getId(), cab.getJobId()));
         BundleJobBean bjb = addRecordToBundleJobTable(Job.Status.RUNNING, true);
         // As no default is specified, user will be considered as topic
         assertEquals(bjb.getUser(), jmsTopicService.getTopic(bjb.getId()));
+        assertEquals(bjb.getUser(), jmsTopicService.getTopic(AppType.BUNDLE_JOB, bjb.getUser(), bjb.getId(), null));
         BundleActionBean bab = addRecordToBundleActionTable(bjb.getId(), "1", 1, Job.Status.RUNNING);
         assertEquals(bjb.getUser(), jmsTopicService.getTopic(bab.getBundleActionId()));
+        assertEquals(bjb.getUser(),
+                jmsTopicService.getTopic(AppType.BUNDLE_ACTION, bjb.getUser(), bab.getBundleActionId(), bab.getBundleId()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki
index 41f7c4c..e343d7e 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -570,7 +570,7 @@ identifier (e.g. default) assigned to a semi-colon separated key#value list of p
      <property>
         <name>oozie.jms.producer.connection.properties</name>
         <value>
-            default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616
+            java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory
         </value>
      </property>
      </verbatim>
@@ -584,7 +584,7 @@ about the various jobs.
             default=${username}
         </value>
         <description>
-            Topic options are ${username} or a fixed string which can be specified as default or for a
+            Topic options are ${username}, ${jobId}, or a fixed string which can be specified as default or for a
             particular job type.
             For e.g To have a fixed string topic for workflows, coordinators and bundles,
             specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/docs/src/site/twiki/DG_JMSNotifications.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_JMSNotifications.twiki b/docs/src/site/twiki/DG_JMSNotifications.twiki
index eaa6b7e..098b080 100644
--- a/docs/src/site/twiki/DG_JMSNotifications.twiki
+++ b/docs/src/site/twiki/DG_JMSNotifications.twiki
@@ -50,10 +50,10 @@ String getTopicPrefix();
 </verbatim>
 
 The topic is obtained by concatenating topic prefix and the substituted value for topic pattern. The topic pattern
-can be a constant value like workflow or coordinator which the administrator has configured or ${username}.
-If ${username}, it has to be substitued with the name of the user who has submitted the job. Administrators can chose
-to publish messages to topics containing user names to avoid having one topic containing all messages and all users having
-to apply selectors to filter the message they are interested in.
+can be a constant value like workflow or coordinator which the administrator has configured or a variable (either ${username}
+or ${jobId}). If ${username}, it has to be substituted with the name of the user who has submitted the job; and if ${jobId} it has
+to be substituted with the job Id. Administrators can chose to publish messages to topics containing user names to avoid having one
+topic containing all messages and all users having to apply selectors to filter the message they are interested in.
 
 The getJMSTopicName API can be used if the job id is already known and will give the exact topic name to which the
 notifications for that job are published.
@@ -162,8 +162,7 @@ First, create the Oozie client and retrieve the JNDI properties to make a connec
    Properties jndiProperties = jmsInfo.getJNDIProperties();
    Context jndiContext = new InitialContext(jndiProperties);
    String connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
-   ConnectionFactory connectionFactory = (ConnectionFactory);
-   jndiContext.lookup(connectionFactoryName);
+   ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    String topicPrefix = jmsInfo.getTopicPrefix();
@@ -173,6 +172,10 @@ First, create the Oozie client and retrieve the JNDI properties to make a connec
    String topicName = null;
    if (topicPattern.equals("${username}")) {
        topicName = "john";
+   // Following code checks if the topic pattern is
+   //'jobId', then the topic name is set to the job id
+   } else if (topicPattern.equals("${jobId}")) {
+       topicName = "0000004-140328125200198-oozie-oozi-W";
    }
    Destination topic = session.createTopic(topicPrefix+topicName);
    MessageConsumer consumer = session.createConsumer(topic);
@@ -185,12 +188,12 @@ interface needs to be implemented. Also, its onMessage() method  needs to be imp
 This method will be called whenever a message is available on the JMS bus.
 
 <verbatim>
-    public void onMessage(Message m) {
-       if (message.getStringProperty(JMSHeaderConsants.MSG_TYPE).equals(MessageType.SLA.name()){
+    public void onMessage(Message message) {
+       if (message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE).equals(MessageType.SLA.name())){
           SLAMessage slaMessage = JMSMessagingUtils.getEventMessage(message);
           // Further processing
        }
-       else if (message.getStringProperty(JMSHeaderConsants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name()){
+       else if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(AppType.WORKFLOW_JOB.name())){
           WorkflowJobMessage wfJobMessage = JMSMessagingUtils.getEventMessage(message);
           // Further processing
        }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0b4d3521/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a65c8af..1bfa9c2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1765 JMS Notifications for Workflows not always on the correct topic (rkanter)
 OOZIE-1732 Sharelib instrumentation fails if sharelib.system.libpath is not created (ryota)
 OOZIE-1692 modify log message when checking completion of child job in Map-Reduce action (ryota)
 OOZIE-1734 Oozie returned 500 Internal Server error when user passes invalid request (checha via rkanter)