You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pe...@apache.org on 2016/04/18 10:45:59 UTC

falcon git commit: FALCON-1888 Falcon JMS notification details and example

Repository: falcon
Updated Branches:
  refs/heads/master dc3e729a8 -> bc58b55a1


FALCON-1888 Falcon JMS notification details and example

Updates the operability document to enable Falcon user notifications and example code to read and Map Message from ActiveMQ.

Author: Venkatesan Ramachandran <vr...@hortonworks.com>

Reviewers: Sowmya<sr...@hortonworks.com>, Peeyush<pe...@apache.org>

Closes #101 from vramachan/master and squashes the following commits:

3429177 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS system and user notification details and sample code
766c130 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS system and user notification details and sample code
a1fe26e [Venkatesan Ramachandran] git rebase from upstream master to ramachan fork master
3cfc4e0 [Venkatesan Ramachandran] FALCON-1888 : Falcon JMS details and usage
fac28b1 [Venkatesan Ramachandran] Merge
afa5173 [Venkatesan Ramachandran] FALCON-1838:Export instances are not added graph db for lineage tracking - remove call to findVertex(), refactor method name
a886958 [Venkatesan Ramachandran] FALCON-1838:Export instances are not added graph db for lineage tracking


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bc58b55a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bc58b55a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bc58b55a

Branch: refs/heads/master
Commit: bc58b55a195a52a7656e2cbd813043676662ace6
Parents: dc3e729
Author: Venkatesan Ramachandran <vr...@hortonworks.com>
Authored: Mon Apr 18 14:15:39 2016 +0530
Committer: peeyush b <pb...@hortonworks.com>
Committed: Mon Apr 18 14:15:39 2016 +0530

----------------------------------------------------------------------
 docs/src/site/twiki/Operability.twiki | 147 +++++++++++++++++++++++++----
 1 file changed, 131 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/bc58b55a/docs/src/site/twiki/Operability.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Operability.twiki b/docs/src/site/twiki/Operability.twiki
index 05850c1..616af36 100644
--- a/docs/src/site/twiki/Operability.twiki
+++ b/docs/src/site/twiki/Operability.twiki
@@ -72,22 +72,137 @@ or send alerts according to their requirements.
 
 ---++ Notifications
 
-Falcon creates a JMS topic for every process/feed that is scheduled in Falcon.
-The implementation class and the broker url of the JMS engine are read from the dependent cluster's definition.
-Users may register consumers on the required topic to check the availability or status of feed instances.
-
-For a given process that is scheduled, the name of the topic is same as the process name.
-Falcon sends a Map message for every feed produced by the instance of a process to the JMS topic.
-The JMS !MapMessage sent to a topic has the following properties:
-entityName, feedNames, feedInstancePath, workflowId, runId, nominalTime, timeStamp, brokerUrl, brokerImplClass, entityType, operation, logFile, topicName, status, brokerTTL;
-
-For a given feed that is scheduled, the name of the topic is same as the feed name.
-Falcon sends a map message for every feed instance that is deleted/archived/replicated depending upon the retention policy set in the feed definition.
-The JMS !MapMessage sent to a topic has the following properties:
-entityName, feedNames, feedInstancePath, workflowId, runId, nominalTime, timeStamp, brokerUrl, brokerImplClass, entityType, operation, logFile, topicName, status, brokerTTL;
-
-The JMS messages are automatically purged after a certain period (default 3 days) by the Falcon JMS house-keeping service.TTL (Time-to-live) for JMS message
-can be configured in the Falcon's startup.properties file.
+Falcon has two types of notifications - System and User notifications.
+
+---+++ System notifications
+The System notifications are internally generated and used by Falcon to monitor the Falcon orchestrated workflow jobs.
+By default, Falcon starts an ActiveMQ embedded JMS server on Falcon machine on port 61616 as a daemon. Alternatively,
+users can make Falcon to use an existing JMS server instead of starting an embedded instance by doing the
+following 2 steps:
+
+   * Setting the property broker.url in the startup.properties as below
+<verbatim>
+   *.broker.url=tcp://jms-server-host:61616
+</verbatim>
+   * Set the system property falcon.embeddedmq to false as below
+<verbatim>
+   <FALCON-INSTALL-DIR>/bin/falcon-start -Dfalcon.embeddedmq=false
+</verbatim>
+
+Falcon uses FALCON.ENTITY.TOPIC to publish system notifications. This topic and the Map Message fields are internal
+and could change between releases.
+
+---+++ User notifications
+
+Falcon, in addition to the FALCON.ENTITY.TOPIC, also creates a JMS topic for every process/feed that is scheduled in
+Falcon as part of User notification. To enable User notifications, the broker url and implementation class of the JMS
+engine need to be specified in the cluster definition associated with the feed/process. Users may register consumers
+on the required topic to check the availability or status of feed instances. The User notification JMS broker instance
+can be same as the System notification or different.
+
+The name of the JMS topic is same as the process/feed name. Falcon sends a map message for every feed instance that is
+created/deleted/replicated/imported/exported to the JMS topic. The JMS Map Message sent to a topic has the following
+fields:
+
+   1. cluster - name of the current cluster the feed/process is dependent on.
+   1. entityType - type of the entity (feed or process).
+   1. entityName - name of the entity.
+   1. nominalTime - instance time (or data date).
+   1. operation - operation like generate, delete, replicate, import, export.
+   1. feedNames - name of the feeds which are generated/replicated/deleted/imported/exported.
+   1. feedInstancePaths - comma separated feed instance paths.
+   1. workflowId - current workflow-id of the instance.
+   1. workflowUser - user who owns the feed instance (i.e partition).
+   1. runId - current run-id of the instance.
+   1. status - status of the user workflow instance.
+   1. timeStamp - current timestamp.
+   1. logDir - log dir where lineage can be recorded.
+
+The JMS messages are automatically purged after a certain period (default 3 days) by the Falcon JMS house-keeping
+service. TTL (Time-to-live) for JMS message can be configured in the Falcon's startup.properties file.
+
+The following example shows how to enable and read user notification by connecting to the JMS broker.
+
+First, specify the JMS broker url in the cluster definition XML as shown below.
+
+<verbatim>
+
+<?xml version="1.0"?>
+<!-- filename : primaryCluster.xml -->
+<cluster colo="USWestOregon" description="oregonHadoopCluster" name="primaryCluster" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        ...
+        ...
+        <interface type="messaging" endpoint="tcp://user-jms-broker-host:61616?daemon=true" version="5.1.6" />
+        ...
+    </interfaces>
+</cluster>
+
+</verbatim>
+
+Next, use a JMS consumer (example below in Java) to read the message from the topic with the name
+FALCON.<feed-or-process-name>
+
+<verbatim>
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+import javax.jms.Session;
+import javax.jms.TopicSession;
+
+public class FalconUserJMSClient {
+    public static void main(String[] args)throws Exception {
+        // Note: specify the JMS broker URL
+        String brokerUrl = "tcp://localhost:61616";
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID("Falcon User JMS Consumer");
+        TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try {
+
+            // Note: the topic name for the feed will be FALCON.<feed-name>
+            Topic falconTopic = session.createTopic("FALCON.feed-sample");
+            MessageConsumer consumer = session.createConsumer(falconTopic);
+            connection.start();
+            while (true) {
+                ActiveMQMapMessage msg = (ActiveMQMapMessage) consumer.receive();
+                System.out.println("cluster             : " + msg.getString("cluster"));
+                System.out.println("entityType          : " + msg.getString("entityType"));
+                System.out.println("entityName          : " + msg.getString("entityName"));
+                System.out.println("nominalTime         : " + msg.getString("nominalTime"));
+                System.out.println("operation           : " + msg.getString("operation"));
+
+                System.out.println("feedNames           : " + msg.getString("feedNames"));
+                System.out.println("feedInstancePaths   : " + msg.getString("feedInstancePaths"));
+
+                System.out.println("workflowId          : " + msg.getString("workflowId"));
+                System.out.println("workflowUser        : " + msg.getString("workflowUser"));
+                System.out.println("runId               : " + msg.getString("runId"));
+                System.out.println("status              : " + msg.getString("status"));
+                System.out.println("timeStamp           : " + msg.getString("timeStamp"));
+                System.out.println("logDir              : " + msg.getString("logDir"));
+
+                System.out.println("brokerUrl           : " + msg.getString("brokerUrl"));
+                System.out.println("brokerImplClass     : " + msg.getString("brokerImplClass"));
+                System.out.println("logFile             : " + msg.getString("logFile"));
+                System.out.println("topicName           : " + msg.getString("topicName"));
+                System.out.println("brokerTTL           : " + msg.getString("brokerTTL"));
+            }
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+}
+</verbatim>
 
 
 ---++ Alerts