You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:35 UTC

[2/8] FALCON-485 - Simplify JMS Message Sender/Consumer and use Workflow Context. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 9f8b07f..95a3780 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -20,13 +20,20 @@ package org.apache.falcon.messaging;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
 
 /**
  * Test for process message producer.
@@ -36,31 +43,36 @@ public class ProcessProducerTest {
     private String[] args;
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-    private static final String TOPIC_NAME = "FALCON.PROCESS";
+    private static final String ENTITY_NAME = "process1";
     private BrokerService broker;
 
     private volatile AssertionError error;
 
     @BeforeClass
     public void setup() throws Exception {
-        args = new String[]{"-" + ARG.entityName.getArgName(), TOPIC_NAME,
-                            "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
-                            "-" + ARG.feedInstancePaths.getArgName(),
-                            "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
-                            "-" + ARG.workflowId.getArgName(), "workflow-01-00",
-                            "-" + ARG.workflowUser.getArgName(), "falcon",
-                            "-" + ARG.runId.getArgName(), "1",
-                            "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
-                            "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
-                            "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
-                            "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
-                            "-" + ARG.entityType.getArgName(), ("process"),
-                            "-" + ARG.operation.getArgName(), ("GENERATE"),
-                            "-" + ARG.logFile.getArgName(), ("/logFile"),
-                            "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
-                            "-" + ARG.status.getArgName(), ("SUCCEEDED"),
-                            "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp", };
+        args = new String[] {
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), "/logFile",
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "/falcon/feed/agg-logs/",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+        };
+
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
@@ -92,7 +104,13 @@ public class ProcessProducerTest {
         };
         t.start();
         Thread.sleep(100);
-        new MessageProducer().run(this.args);
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(
+                args, WorkflowExecutionContext.Type.POST_PROCESSING);
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.USER).build();
+        jmsMessageProducer.sendMessage();
+
         t.join();
         if (error != null) {
             throw error;
@@ -100,26 +118,23 @@ public class ProcessProducerTest {
     }
 
     private void consumer() throws JMSException {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                BROKER_URL);
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createTopic(TOPIC_NAME);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(getTopicName());
         MessageConsumer consumer = session.createConsumer(destination);
 
-        // wait till you get atleast one message
+        // wait till you get at least one message
         MapMessage m;
         for (m = null; m == null;) {
             m = (MapMessage) consumer.receive();
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedNames.getArgName()),
-                "click-logs");
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "click-logs");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/click-logs/10/05/05/00/20");
 
         for (m = null; m == null;) {
@@ -127,24 +142,27 @@ public class ProcessProducerTest {
         }
         System.out.println("Consumed: " + m.toString());
         assertMessage(m);
-        Assert.assertEquals(m.getString(ARG.feedNames.getArgName()), "raw-logs");
-        Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "raw-logs");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                 "/raw-logs/10/05/05/00/20");
         connection.close();
     }
 
+    private String getTopicName() {
+        return JMSMessageProducer.FALCON_TOPIC_PREFIX + ENTITY_NAME;
+    }
+
     private void assertMessage(MapMessage m) throws JMSException {
-        Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
-                TOPIC_NAME);
-        Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), ENTITY_NAME);
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_ID.getName()),
                 "workflow-01-00");
-        Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName()),
                 "falcon");
-        Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
-        Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
                 "2011-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
                 "2012-01-01T01:00Z");
-        Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
index 8744ad8..6e80db8 100644
--- a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
+++ b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
@@ -43,7 +43,7 @@ public final class ResourcesReflectionUtil {
         buildAnnotationsMapForClass("org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy");
         buildAnnotationsMapForClass("org.apache.falcon.resource.proxy.InstanceManagerProxy");
         buildAnnotationsMapForClass("org.apache.falcon.resource.AbstractInstanceManager");
-        buildAnnotationsMapForClass("org.apache.falcon.service.FalconTopicSubscriber");
+        buildAnnotationsMapForClass("org.apache.falcon.messaging.JMSMessageConsumer");
         buildAnnotationsMapForClass("org.apache.falcon.aspect.GenericAlert");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 44f4410..459307c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -26,7 +26,6 @@ import org.apache.falcon.entity.ExternalId;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -36,6 +35,7 @@ import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
 import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClient;
 
@@ -62,7 +62,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         this.lifecycle = tag;
     }
 
-    public static final OozieCoordinatorBuilder get(Entity entity, Tag tag) {
+    public static OozieCoordinatorBuilder get(Entity entity, Tag tag) {
         switch(entity.getEntityType()) {
         case FEED:
             switch (tag) {
@@ -94,28 +94,30 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
         return EntityUtil.getWorkflowName(lifecycle, entity).toString();
     }
 
-    protected Path marshal(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
+    protected Path marshal(Cluster cluster, COORDINATORAPP coord,
+                           Path outPath) throws FalconException {
         return marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
             OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
     }
 
-    protected Properties createCoordDefaultConfiguration(Cluster cluster, String coordName)  throws FalconException {
+    protected Properties createCoordDefaultConfiguration(Cluster cluster,
+                                                         String coordName)  throws FalconException {
         Properties props = new Properties();
-        props.put(ARG.entityName.getPropName(), entity.getName());
-        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
-        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+        props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
+        props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
         props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
         props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
-            "tcp://localhost:61616?daemon=true");
-        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
-            ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true");
+        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
         String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
             DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
-        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
         props.put("logDir", getLogDirectory(cluster));
         props.put(OozieClient.EXTERNAL_ID,
             new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
@@ -132,7 +134,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
 
         props.put("entityName", entity.getName());
         props.put("entityType", entity.getEntityType().name().toLowerCase());
-        props.put(ARG.cluster.getPropName(), cluster.getName());
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
 
         props.put(MR_QUEUE_NAME, "default");
         props.put(MR_JOB_PRIORITY, "NORMAL");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 2798db2..d6115b2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -37,7 +37,6 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -45,6 +44,7 @@ import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -249,7 +249,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
     }
 
-    private void propagateLateDataProperties(String instancePaths, String falconFeedStorageType, Properties props) {
+    private void propagateLateDataProperties(String instancePaths,
+                                             String falconFeedStorageType, Properties props) {
         // todo these pairs are the same but used in different context
         // late data handler - should-record action
         props.put("falconInputFeeds", entity.getName());
@@ -260,11 +261,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
         props.put("falconInputFeedStorageTypes", falconFeedStorageType);
 
         // falcon post processing
-        props.put(ARG.feedNames.getPropName(), entity.getName());
-        props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "${coord:dataOut('output')}");
     }
 
-    private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+    private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
+                                        Path buildPath) throws FalconException {
         Configuration conf = ClusterHelper.getConfiguration(trgCluster);
         FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 2238778..2a67fd3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -26,14 +26,14 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Arrays;
@@ -85,9 +85,10 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
 
         props.put("limit", feedCluster.getRetention().getLimit().toString());
 
-        props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-        props.put(ARG.feedNames.getPropName(), entity.getName());
-        props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+        props.put(WorkflowExecutionArgs.OPERATION.getName(),
+                WorkflowExecutionContext.EntityOperations.DELETE.name());
+        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
 
         props.put("falconInputFeeds", entity.getName());
         props.put("falconInPaths", IGNORE);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index e907087..d391032 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -37,7 +37,6 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
@@ -50,6 +49,7 @@ import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
 import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -243,8 +243,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
 
     private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
         if (entity.getOutputs() == null) {
-            props.put(ARG.feedNames.getPropName(), "NONE");
-            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+            props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "NONE");
+            props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
             return;
         }
 
@@ -282,8 +282,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
         }
 
         // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), StringUtils.join(outputFeeds, ','));
-        props.put(ARG.feedInstancePaths.getPropName(), StringUtils.join(outputPaths, ','));
+        props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
+        props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), StringUtils.join(outputPaths, ','));
     }
 
     private DATAOUT createDataOut(Output output) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index fc1af7b..927aba3 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -216,7 +216,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         // verify workflow params
         Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
-        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+        Assert.assertEquals(props.get("userWorkflowVersion"), "0.6");
         Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
 
         // verify default params

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index ffcc88a..45a8732 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -40,7 +40,6 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.messaging.EntityInstanceMessage;
 import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -55,6 +54,7 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -691,13 +691,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
 
         String[] expected = {
-            EntityInstanceMessage.ARG.feedNames.getPropName(),
-            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
-            "falconInputFeeds",
+            WorkflowExecutionArgs.FEED_NAMES.getName(),
+            WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+            WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
             "falconInPaths",
-            "userWorkflowName",
-            "userWorkflowVersion",
-            "userWorkflowEngine",
+            WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
+            WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
+            WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
         };
 
         for (String property : expected) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/resources/oozie/xmls/workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/oozie/xmls/workflow.xml b/oozie/src/test/resources/oozie/xmls/workflow.xml
index 5d14413..81807a6 100644
--- a/oozie/src/test/resources/oozie/xmls/workflow.xml
+++ b/oozie/src/test/resources/oozie/xmls/workflow.xml
@@ -22,7 +22,7 @@
         <java>
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
-            <main-class>org.apache.falcon.messaging.MessageProducer</main-class>
+            <main-class>org.apache.falcon.messaging.JMSMessageProducer</main-class>
             <arg>${wf:name()}</arg>
             <arg>${wf:appPath()}</arg>
             <arg>${timestamp()}</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
deleted file mode 100644
index 35e0ba3..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.metadata.MetadataMappingService;
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
-import org.apache.falcon.rerun.handler.AbstractRerunHandler;
-import org.apache.falcon.rerun.handler.RerunHandlerFactory;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Date;
-
-/**
- * Subscribes to the falcon topic for handling retries and alerts.
- */
-public class FalconTopicSubscriber implements MessageListener, ExceptionListener {
-    private static final Logger LOG = LoggerFactory.getLogger(FalconTopicSubscriber.class);
-
-    private final String implementation;
-    private final String userName;
-    private final String password;
-    private final String url;
-    private final String topicName;
-
-    private Connection connection;
-    private TopicSubscriber subscriber;
-
-    private AbstractRerunHandler retryHandler = RerunHandlerFactory.getRerunHandler(RerunType.RETRY);
-    private AbstractRerunHandler latedataHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE);
-
-    public FalconTopicSubscriber(String implementation, String userName,
-                                 String password, String url, String topicName) {
-        this.implementation = implementation;
-        this.userName = userName;
-        this.password = password;
-        this.url = url;
-        this.topicName = topicName;
-    }
-
-    public void startSubscriber() throws FalconException {
-        try {
-            connection = createAndGetConnection(implementation, userName, password, url);
-            TopicSession session = (TopicSession) connection.createSession(
-                    false, Session.AUTO_ACKNOWLEDGE);
-            Topic destination = session.createTopic(topicName);
-            subscriber = session.createSubscriber(destination);
-            subscriber.setMessageListener(this);
-            connection.setExceptionListener(this);
-            connection.start();
-        } catch (Exception e) {
-            LOG.error("Error starting subscriber of topic: {}", this, e);
-            throw new FalconException(e);
-        }
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        MapMessage mapMessage = (MapMessage) message;
-        try {
-            if (LOG.isDebugEnabled()) {debug(mapMessage); }
-            String cluster = mapMessage.getString(ARG.cluster.getArgName());
-            String entityName = mapMessage.getString(ARG.entityName.getArgName());
-            String entityType = mapMessage.getString(ARG.entityType.getArgName());
-            String workflowId = mapMessage.getString(ARG.workflowId.getArgName());
-            String workflowUser = mapMessage.getString(ARG.workflowUser.getArgName());
-            String runId = mapMessage.getString(ARG.runId.getArgName());
-            String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
-            String status = mapMessage.getString(ARG.status.getArgName());
-            String operation = mapMessage.getString(ARG.operation.getArgName());
-
-            CurrentUser.authenticate(workflowUser);
-            AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-            InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
-            Date startTime = result.getInstances()[0].startTime;
-            Date endTime = result.getInstances()[0].endTime;
-            Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
-            if (status.equalsIgnoreCase("FAILED")) {
-                retryHandler.handleRerun(cluster, entityType, entityName,
-                        nominalTime, runId, workflowId, workflowUser,
-                        System.currentTimeMillis());
-
-                GenericAlert.instrumentFailedInstance(cluster, entityType,
-                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime), "", "", duration);
-
-            } else if (status.equalsIgnoreCase("SUCCEEDED")) {
-                Entity entity = EntityUtil.getEntity(entityType, entityName);
-                //late data handling not applicable for feed retention action
-                if (!operation.equalsIgnoreCase("DELETE") && EntityUtil.getLateProcess(entity) != null) {
-                    latedataHandler.handleRerun(cluster, entityType, entityName,
-                        nominalTime, runId, workflowId, workflowUser,
-                        System.currentTimeMillis());
-                } else {
-                    LOG.info("Late data handling not applicable for entityType: {}, entityName: {} operation: {}",
-                            entityType, entityName, operation);
-                }
-                GenericAlert.instrumentSucceededInstance(cluster, entityType,
-                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                    SchemaHelper.formatDateUTC(startTime), duration);
-
-                notifyMetadataMappingService(entityName, operation, mapMessage.getString(ARG.logDir.getArgName()));
-            }
-        } catch (JMSException e) {
-            LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
-        } catch (FalconException e) {
-            LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
-        } catch (Exception e) {
-            LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
-        }
-    }
-
-    private void notifyMetadataMappingService(String entityName, String operation,
-                                              String logDir) throws FalconException {
-        if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
-            MetadataMappingService service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
-            service.onSuccessfulWorkflowCompletion(entityName, operation, logDir);
-        }
-    }
-
-    private void debug(MapMessage mapMessage) throws JMSException {
-        StringBuilder buff = new StringBuilder();
-        buff.append("Received:{");
-        for (ARG arg : ARG.values()) {
-            buff.append(arg.getArgName()).append('=')
-                .append(mapMessage.getString(arg.getArgName())).append(", ");
-        }
-        buff.append("}");
-        LOG.debug(buff.toString());
-    }
-
-    @Override
-    public void onException(JMSException ignore) {
-        LOG.info("Error in onException for subscriber of topic: {}", this.toString(), ignore);
-    }
-
-    public void closeSubscriber() throws FalconException {
-        try {
-            LOG.info("Closing subscriber on topic: {}", this.topicName);
-            if (subscriber != null) {
-                subscriber.close();
-            }
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (JMSException e) {
-            LOG.error("Error closing subscriber of topic: {}", this.toString(), e);
-            throw new FalconException(e);
-        }
-    }
-
-    private static Connection createAndGetConnection(String implementation,
-                                                     String userName, String password, String url)
-        throws JMSException, ClassNotFoundException, InstantiationException,
-            IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
-        @SuppressWarnings("unchecked")
-        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) FalconTopicSubscriber.class
-                .getClassLoader().loadClass(implementation);
-
-        ConnectionFactory connectionFactory = clazz.getConstructor(
-                String.class, String.class, String.class).newInstance(userName,
-                password, url);
-
-        return connectionFactory.createConnection();
-    }
-
-    @Override
-    public String toString() {
-        return topicName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
index 9ae1ad1..06fec9f 100644
--- a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
+++ b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
@@ -18,14 +18,16 @@
 package org.apache.falcon.service;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.messaging.JMSMessageConsumer;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 
 /**
  * A Falcon Service that initializes and starts a topic subscriber.
  */
 public class ProcessSubscriberService implements FalconService {
 
-    private FalconTopicSubscriber subscriber;
+    private JMSMessageConsumer subscriber;
 
     private static enum JMSProps {
         FalconBrokerImplClass("broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"),
@@ -48,11 +50,18 @@ public class ProcessSubscriberService implements FalconService {
 
     @Override
     public void init() throws FalconException {
+        if (!Services.get().isRegistered(WorkflowJobEndNotificationService.NAME)) {
+            throw new FalconException("WorkflowJobEndNotificationService must be configured ahead");
+        }
+
         String falconBrokerImplClass = getPropertyValue(JMSProps.FalconBrokerImplClass);
         String falconBrokerUrl = getPropertyValue(JMSProps.FalconBrokerUrl);
         String falconEntityTopic = getPropertyValue(JMSProps.FalconEntityTopic);
 
-        subscriber = new FalconTopicSubscriber(falconBrokerImplClass, "", "", falconBrokerUrl, falconEntityTopic);
+        WorkflowJobEndNotificationService jobEndNotificationService =
+                Services.get().getService(WorkflowJobEndNotificationService.NAME);
+        subscriber = new JMSMessageConsumer(falconBrokerImplClass, "", "", falconBrokerUrl,
+                falconEntityTopic, jobEndNotificationService);
         subscriber.startSubscriber();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
deleted file mode 100644
index 9b1d42a..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.messaging.EntityInstanceMessage;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.mortbay.log.Log;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.jms.*;
-
-/**
- * Test for FalconTopicSubscriber.
- */
-public class FalconTopicSubscriberTest {
-
-    private static final String BROKER_URL = "vm://localhost";
-    private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
-    private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
-    private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
-    private BrokerService broker;
-
-    @BeforeClass
-    public void setup() throws Exception {
-        broker = new BrokerService();
-        broker.addConnector(BROKER_URL);
-        broker.setDataDirectory("target/activemq");
-        broker.setBrokerName("localhost");
-        broker.start();
-    }
-
-    public void sendMessages(String topic) throws JMSException {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                BROKER_URL);
-        Connection connection = connectionFactory.createConnection();
-        connection.start();
-
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
-        Topic destination = session.createTopic(topic);
-        MessageProducer producer = session
-                .createProducer(destination);
-        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        for (int i = 0; i < 3; i++) {
-            EntityInstanceMessage falconMessage = getMockFalconMessage(i);
-            MapMessage message = session.createMapMessage();
-            for (ARG arg : ARG.values()) {
-                message.setString(arg.getPropName(), falconMessage
-                        .getKeyValueMap().get(arg));
-            }
-            Log.debug("Sending:" + message);
-            producer.send(message);
-        }
-
-        EntityInstanceMessage message = getMockFalconMessage(15);
-        MapMessage mapMessage = session.createMapMessage();
-        message.getKeyValueMap().put(ARG.status, "FAILED");
-        for (ARG arg : ARG.values()) {
-            mapMessage.setString(arg.getPropName(), message.getKeyValueMap().get(arg));
-        }
-        producer.send(mapMessage);
-    }
-
-    private EntityInstanceMessage getMockFalconMessage(int i) {
-        EntityInstanceMessage message = new EntityInstanceMessage();
-        message.getKeyValueMap().put(ARG.brokerImplClass, BROKER_IMPL_CLASS);
-        message.getKeyValueMap().put(ARG.brokerUrl, BROKER_URL);
-        message.getKeyValueMap().put(ARG.entityName, "process1");
-        message.getKeyValueMap().put(ARG.entityType, "PROCESS");
-        message.getKeyValueMap().put(ARG.feedInstancePaths,
-                "/clicks/hour/00/0" + i);
-        message.getKeyValueMap().put(ARG.feedNames, "clicks");
-        message.getKeyValueMap().put(ARG.logFile, "/logfile");
-        message.getKeyValueMap().put(ARG.nominalTime, "2012-10-10-10-10");
-        message.getKeyValueMap().put(ARG.operation, "GENERATE");
-        message.getKeyValueMap().put(ARG.runId, "0");
-        message.getKeyValueMap().put(ARG.timeStamp, "2012-10-10-10-1" + i);
-        message.getKeyValueMap().put(ARG.workflowId, "workflow-" + i);
-        message.getKeyValueMap().put(ARG.topicName, TOPIC_NAME);
-        message.getKeyValueMap().put(ARG.status, "SUCCEEDED");
-        message.getKeyValueMap().put(ARG.workflowUser, "falcon");
-        return message;
-    }
-
-    @Test
-    public void testSubscriber() throws Exception{
-        //Comma separated topics are supported in startup properties
-        FalconTopicSubscriber subscriber = new FalconTopicSubscriber(
-                BROKER_IMPL_CLASS, "", "", BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
-        subscriber.startSubscriber();
-        sendMessages(TOPIC_NAME);
-        Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
-        sendMessages(SECONDARY_TOPIC_NAME);
-        Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
-        Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
-        subscriber.closeSubscriber();
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        broker.deleteAllMessages();
-        broker.stop();
-    }
-}