You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:35 UTC
[2/8] FALCON-485 - Simplify JMS Message Sender/Consumer and use
Workflow Context. Contributed by Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 9f8b07f..95a3780 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -20,13 +20,20 @@ package org.apache.falcon.messaging;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
/**
* Test for process message producer.
@@ -36,31 +43,36 @@ public class ProcessProducerTest {
private String[] args;
private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
- private static final String TOPIC_NAME = "FALCON.PROCESS";
+ private static final String ENTITY_NAME = "process1";
private BrokerService broker;
private volatile AssertionError error;
@BeforeClass
public void setup() throws Exception {
- args = new String[]{"-" + ARG.entityName.getArgName(), TOPIC_NAME,
- "-" + ARG.feedNames.getArgName(), "click-logs,raw-logs",
- "-" + ARG.feedInstancePaths.getArgName(),
- "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
- "-" + ARG.workflowId.getArgName(), "workflow-01-00",
- "-" + ARG.workflowUser.getArgName(), "falcon",
- "-" + ARG.runId.getArgName(), "1",
- "-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
- "-" + ARG.timeStamp.getArgName(), "2012-01-01-01-00",
- "-" + ARG.brokerUrl.getArgName(), BROKER_URL,
- "-" + ARG.brokerImplClass.getArgName(), (BROKER_IMPL_CLASS),
- "-" + ARG.entityType.getArgName(), ("process"),
- "-" + ARG.operation.getArgName(), ("GENERATE"),
- "-" + ARG.logFile.getArgName(), ("/logFile"),
- "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
- "-" + ARG.status.getArgName(), ("SUCCEEDED"),
- "-" + ARG.brokerTTL.getArgName(), "10",
- "-" + ARG.cluster.getArgName(), "corp", };
+ args = new String[] {
+ "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "click-logs,raw-logs",
+ "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+ "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+ "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+ "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+ "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+ "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+ "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+ "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+ "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+ "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+ "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(), "/logFile",
+ "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "/falcon/feed/agg-logs/",
+ "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+ "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+ };
+
broker = new BrokerService();
broker.addConnector(BROKER_URL);
broker.setDataDirectory("target/activemq");
@@ -92,7 +104,13 @@ public class ProcessProducerTest {
};
t.start();
Thread.sleep(100);
- new MessageProducer().run(this.args);
+
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(
+ args, WorkflowExecutionContext.Type.POST_PROCESSING);
+ JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+ .type(JMSMessageProducer.MessageType.USER).build();
+ jmsMessageProducer.sendMessage();
+
t.join();
if (error != null) {
throw error;
@@ -100,26 +118,23 @@ public class ProcessProducerTest {
}
private void consumer() throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- BROKER_URL);
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(TOPIC_NAME);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(getTopicName());
MessageConsumer consumer = session.createConsumer(destination);
- // wait till you get atleast one message
+ // wait till you get at least one message
MapMessage m;
for (m = null; m == null;) {
m = (MapMessage) consumer.receive();
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(ARG.feedNames.getArgName()),
- "click-logs");
- Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "click-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
"/click-logs/10/05/05/00/20");
for (m = null; m == null;) {
@@ -127,24 +142,27 @@ public class ProcessProducerTest {
}
System.out.println("Consumed: " + m.toString());
assertMessage(m);
- Assert.assertEquals(m.getString(ARG.feedNames.getArgName()), "raw-logs");
- Assert.assertEquals(m.getString(ARG.feedInstancePaths.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "raw-logs");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
"/raw-logs/10/05/05/00/20");
connection.close();
}
+ private String getTopicName() {
+ return JMSMessageProducer.FALCON_TOPIC_PREFIX + ENTITY_NAME;
+ }
+
private void assertMessage(MapMessage m) throws JMSException {
- Assert.assertEquals(m.getString(ARG.entityName.getArgName()),
- TOPIC_NAME);
- Assert.assertEquals(m.getString(ARG.workflowId.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), ENTITY_NAME);
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_ID.getName()),
"workflow-01-00");
- Assert.assertEquals(m.getString(ARG.workflowUser.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName()),
"falcon");
- Assert.assertEquals(m.getString(ARG.runId.getArgName()), "1");
- Assert.assertEquals(m.getString(ARG.nominalTime.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
"2011-01-01T01:00Z");
- Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
"2012-01-01T01:00Z");
- Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
index 8744ad8..6e80db8 100644
--- a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
+++ b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
@@ -43,7 +43,7 @@ public final class ResourcesReflectionUtil {
buildAnnotationsMapForClass("org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy");
buildAnnotationsMapForClass("org.apache.falcon.resource.proxy.InstanceManagerProxy");
buildAnnotationsMapForClass("org.apache.falcon.resource.AbstractInstanceManager");
- buildAnnotationsMapForClass("org.apache.falcon.service.FalconTopicSubscriber");
+ buildAnnotationsMapForClass("org.apache.falcon.messaging.JMSMessageConsumer");
buildAnnotationsMapForClass("org.apache.falcon.aspect.GenericAlert");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 44f4410..459307c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -26,7 +26,6 @@ import org.apache.falcon.entity.ExternalId;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.coordinator.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -36,6 +35,7 @@ import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClient;
@@ -62,7 +62,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
this.lifecycle = tag;
}
- public static final OozieCoordinatorBuilder get(Entity entity, Tag tag) {
+ public static OozieCoordinatorBuilder get(Entity entity, Tag tag) {
switch(entity.getEntityType()) {
case FEED:
switch (tag) {
@@ -94,28 +94,30 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
return EntityUtil.getWorkflowName(lifecycle, entity).toString();
}
- protected Path marshal(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
+ protected Path marshal(Cluster cluster, COORDINATORAPP coord,
+ Path outPath) throws FalconException {
return marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
}
- protected Properties createCoordDefaultConfiguration(Cluster cluster, String coordName) throws FalconException {
+ protected Properties createCoordDefaultConfiguration(Cluster cluster,
+ String coordName) throws FalconException {
Properties props = new Properties();
- props.put(ARG.entityName.getPropName(), entity.getName());
- props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
- props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+ props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
+ props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
- String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
- "tcp://localhost:61616?daemon=true");
- props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
- String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
- ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
- props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+ String falconBrokerUrl = StartupProperties.get().getProperty(
+ WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true");
+ props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+ String falconBrokerImplClass = StartupProperties.get().getProperty(
+ WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
DEFAULT_BROKER_MSG_TTL.toString());
- props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
- props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+ props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+ props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
props.put("logDir", getLogDirectory(cluster));
props.put(OozieClient.EXTERNAL_ID,
new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
@@ -132,7 +134,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
props.put("entityName", entity.getName());
props.put("entityType", entity.getEntityType().name().toLowerCase());
- props.put(ARG.cluster.getPropName(), cluster.getName());
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
props.put(MR_QUEUE_NAME, "default");
props.put(MR_JOB_PRIORITY, "NORMAL");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 2798db2..d6115b2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -37,7 +37,6 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -45,6 +44,7 @@ import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -249,7 +249,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
}
- private void propagateLateDataProperties(String instancePaths, String falconFeedStorageType, Properties props) {
+ private void propagateLateDataProperties(String instancePaths,
+ String falconFeedStorageType, Properties props) {
// todo these pairs are the same but used in different context
// late data handler - should-record action
props.put("falconInputFeeds", entity.getName());
@@ -260,11 +261,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
props.put("falconInputFeedStorageTypes", falconFeedStorageType);
// falcon post processing
- props.put(ARG.feedNames.getPropName(), entity.getName());
- props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+ props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), "${coord:dataOut('output')}");
}
- private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+ private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
+ Path buildPath) throws FalconException {
Configuration conf = ClusterHelper.getConfiguration(trgCluster);
FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 2238778..2a67fd3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -26,14 +26,14 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.Arrays;
@@ -85,9 +85,10 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
props.put("limit", feedCluster.getRetention().getLimit().toString());
- props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
- props.put(ARG.feedNames.getPropName(), entity.getName());
- props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+ props.put(WorkflowExecutionArgs.OPERATION.getName(),
+ WorkflowExecutionContext.EntityOperations.DELETE.name());
+ props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
props.put("falconInputFeeds", entity.getName());
props.put("falconInPaths", IGNORE);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index e907087..d391032 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -37,7 +37,6 @@ import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
@@ -50,6 +49,7 @@ import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
@@ -243,8 +243,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getOutputs() == null) {
- props.put(ARG.feedNames.getPropName(), "NONE");
- props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+ props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), "NONE");
+ props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
return;
}
@@ -282,8 +282,8 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
}
// Output feed name and path for parent workflow
- props.put(ARG.feedNames.getPropName(), StringUtils.join(outputFeeds, ','));
- props.put(ARG.feedInstancePaths.getPropName(), StringUtils.join(outputPaths, ','));
+ props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), StringUtils.join(outputFeeds, ','));
+ props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), StringUtils.join(outputPaths, ','));
}
private DATAOUT createDataOut(Output output) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index fc1af7b..927aba3 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -216,7 +216,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
// verify workflow params
Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
- Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+ Assert.assertEquals(props.get("userWorkflowVersion"), "0.6");
Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
// verify default params
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index ffcc88a..45a8732 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -40,7 +40,6 @@ import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.messaging.EntityInstanceMessage;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -55,6 +54,7 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -691,13 +691,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
String[] expected = {
- EntityInstanceMessage.ARG.feedNames.getPropName(),
- EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
- "falconInputFeeds",
+ WorkflowExecutionArgs.FEED_NAMES.getName(),
+ WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
+ WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
"falconInPaths",
- "userWorkflowName",
- "userWorkflowVersion",
- "userWorkflowEngine",
+ WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
+ WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
+ WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
};
for (String property : expected) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/oozie/src/test/resources/oozie/xmls/workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/oozie/xmls/workflow.xml b/oozie/src/test/resources/oozie/xmls/workflow.xml
index 5d14413..81807a6 100644
--- a/oozie/src/test/resources/oozie/xmls/workflow.xml
+++ b/oozie/src/test/resources/oozie/xmls/workflow.xml
@@ -22,7 +22,7 @@
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
- <main-class>org.apache.falcon.messaging.MessageProducer</main-class>
+ <main-class>org.apache.falcon.messaging.JMSMessageProducer</main-class>
<arg>${wf:name()}</arg>
<arg>${wf:appPath()}</arg>
<arg>${timestamp()}</arg>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
deleted file mode 100644
index 35e0ba3..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.metadata.MetadataMappingService;
-import org.apache.falcon.rerun.event.RerunEvent.RerunType;
-import org.apache.falcon.rerun.handler.AbstractRerunHandler;
-import org.apache.falcon.rerun.handler.RerunHandlerFactory;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Date;
-
-/**
- * Subscribes to the falcon topic for handling retries and alerts.
- */
-public class FalconTopicSubscriber implements MessageListener, ExceptionListener {
- private static final Logger LOG = LoggerFactory.getLogger(FalconTopicSubscriber.class);
-
- private final String implementation;
- private final String userName;
- private final String password;
- private final String url;
- private final String topicName;
-
- private Connection connection;
- private TopicSubscriber subscriber;
-
- private AbstractRerunHandler retryHandler = RerunHandlerFactory.getRerunHandler(RerunType.RETRY);
- private AbstractRerunHandler latedataHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE);
-
- public FalconTopicSubscriber(String implementation, String userName,
- String password, String url, String topicName) {
- this.implementation = implementation;
- this.userName = userName;
- this.password = password;
- this.url = url;
- this.topicName = topicName;
- }
-
- public void startSubscriber() throws FalconException {
- try {
- connection = createAndGetConnection(implementation, userName, password, url);
- TopicSession session = (TopicSession) connection.createSession(
- false, Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic(topicName);
- subscriber = session.createSubscriber(destination);
- subscriber.setMessageListener(this);
- connection.setExceptionListener(this);
- connection.start();
- } catch (Exception e) {
- LOG.error("Error starting subscriber of topic: {}", this, e);
- throw new FalconException(e);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- MapMessage mapMessage = (MapMessage) message;
- try {
- if (LOG.isDebugEnabled()) {debug(mapMessage); }
- String cluster = mapMessage.getString(ARG.cluster.getArgName());
- String entityName = mapMessage.getString(ARG.entityName.getArgName());
- String entityType = mapMessage.getString(ARG.entityType.getArgName());
- String workflowId = mapMessage.getString(ARG.workflowId.getArgName());
- String workflowUser = mapMessage.getString(ARG.workflowUser.getArgName());
- String runId = mapMessage.getString(ARG.runId.getArgName());
- String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
- String status = mapMessage.getString(ARG.status.getArgName());
- String operation = mapMessage.getString(ARG.operation.getArgName());
-
- CurrentUser.authenticate(workflowUser);
- AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
- Date startTime = result.getInstances()[0].startTime;
- Date endTime = result.getInstances()[0].endTime;
- Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
- if (status.equalsIgnoreCase("FAILED")) {
- retryHandler.handleRerun(cluster, entityType, entityName,
- nominalTime, runId, workflowId, workflowUser,
- System.currentTimeMillis());
-
- GenericAlert.instrumentFailedInstance(cluster, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), "", "", duration);
-
- } else if (status.equalsIgnoreCase("SUCCEEDED")) {
- Entity entity = EntityUtil.getEntity(entityType, entityName);
- //late data handling not applicable for feed retention action
- if (!operation.equalsIgnoreCase("DELETE") && EntityUtil.getLateProcess(entity) != null) {
- latedataHandler.handleRerun(cluster, entityType, entityName,
- nominalTime, runId, workflowId, workflowUser,
- System.currentTimeMillis());
- } else {
- LOG.info("Late data handling not applicable for entityType: {}, entityName: {} operation: {}",
- entityType, entityName, operation);
- }
- GenericAlert.instrumentSucceededInstance(cluster, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), duration);
-
- notifyMetadataMappingService(entityName, operation, mapMessage.getString(ARG.logDir.getArgName()));
- }
- } catch (JMSException e) {
- LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
- } catch (FalconException e) {
- LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
- } catch (Exception e) {
- LOG.info("Error in onMessage for subscriber of topic: {}", this, e);
- }
- }
-
- private void notifyMetadataMappingService(String entityName, String operation,
- String logDir) throws FalconException {
- if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
- MetadataMappingService service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
- service.onSuccessfulWorkflowCompletion(entityName, operation, logDir);
- }
- }
-
- private void debug(MapMessage mapMessage) throws JMSException {
- StringBuilder buff = new StringBuilder();
- buff.append("Received:{");
- for (ARG arg : ARG.values()) {
- buff.append(arg.getArgName()).append('=')
- .append(mapMessage.getString(arg.getArgName())).append(", ");
- }
- buff.append("}");
- LOG.debug(buff.toString());
- }
-
- @Override
- public void onException(JMSException ignore) {
- LOG.info("Error in onException for subscriber of topic: {}", this.toString(), ignore);
- }
-
- public void closeSubscriber() throws FalconException {
- try {
- LOG.info("Closing subscriber on topic: {}", this.topicName);
- if (subscriber != null) {
- subscriber.close();
- }
- if (connection != null) {
- connection.close();
- }
- } catch (JMSException e) {
- LOG.error("Error closing subscriber of topic: {}", this.toString(), e);
- throw new FalconException(e);
- }
- }
-
- private static Connection createAndGetConnection(String implementation,
- String userName, String password, String url)
- throws JMSException, ClassNotFoundException, InstantiationException,
- IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-
- @SuppressWarnings("unchecked")
- Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) FalconTopicSubscriber.class
- .getClassLoader().loadClass(implementation);
-
- ConnectionFactory connectionFactory = clazz.getConstructor(
- String.class, String.class, String.class).newInstance(userName,
- password, url);
-
- return connectionFactory.createConnection();
- }
-
- @Override
- public String toString() {
- return topicName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
index 9ae1ad1..06fec9f 100644
--- a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
+++ b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
@@ -18,14 +18,16 @@
package org.apache.falcon.service;
import org.apache.falcon.FalconException;
+import org.apache.falcon.messaging.JMSMessageConsumer;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
/**
* A Falcon Service that initializes and starts a topic subscriber.
*/
public class ProcessSubscriberService implements FalconService {
- private FalconTopicSubscriber subscriber;
+ private JMSMessageConsumer subscriber;
private static enum JMSProps {
FalconBrokerImplClass("broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"),
@@ -48,11 +50,18 @@ public class ProcessSubscriberService implements FalconService {
@Override
public void init() throws FalconException {
+ if (!Services.get().isRegistered(WorkflowJobEndNotificationService.NAME)) {
+ throw new FalconException("WorkflowJobEndNotificationService must be configured ahead");
+ }
+
String falconBrokerImplClass = getPropertyValue(JMSProps.FalconBrokerImplClass);
String falconBrokerUrl = getPropertyValue(JMSProps.FalconBrokerUrl);
String falconEntityTopic = getPropertyValue(JMSProps.FalconEntityTopic);
- subscriber = new FalconTopicSubscriber(falconBrokerImplClass, "", "", falconBrokerUrl, falconEntityTopic);
+ WorkflowJobEndNotificationService jobEndNotificationService =
+ Services.get().getService(WorkflowJobEndNotificationService.NAME);
+ subscriber = new JMSMessageConsumer(falconBrokerImplClass, "", "", falconBrokerUrl,
+ falconEntityTopic, jobEndNotificationService);
subscriber.startSubscriber();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/23762e55/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
deleted file mode 100644
index 9b1d42a..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.messaging.EntityInstanceMessage;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.mortbay.log.Log;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.jms.*;
-
-/**
- * Test for FalconTopicSubscriber.
- */
-public class FalconTopicSubscriberTest {
-
- private static final String BROKER_URL = "vm://localhost";
- private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
- private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
- private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
- private BrokerService broker;
-
- @BeforeClass
- public void setup() throws Exception {
- broker = new BrokerService();
- broker.addConnector(BROKER_URL);
- broker.setDataDirectory("target/activemq");
- broker.setBrokerName("localhost");
- broker.start();
- }
-
- public void sendMessages(String topic) throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- BROKER_URL);
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic(topic);
- MessageProducer producer = session
- .createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < 3; i++) {
- EntityInstanceMessage falconMessage = getMockFalconMessage(i);
- MapMessage message = session.createMapMessage();
- for (ARG arg : ARG.values()) {
- message.setString(arg.getPropName(), falconMessage
- .getKeyValueMap().get(arg));
- }
- Log.debug("Sending:" + message);
- producer.send(message);
- }
-
- EntityInstanceMessage message = getMockFalconMessage(15);
- MapMessage mapMessage = session.createMapMessage();
- message.getKeyValueMap().put(ARG.status, "FAILED");
- for (ARG arg : ARG.values()) {
- mapMessage.setString(arg.getPropName(), message.getKeyValueMap().get(arg));
- }
- producer.send(mapMessage);
- }
-
- private EntityInstanceMessage getMockFalconMessage(int i) {
- EntityInstanceMessage message = new EntityInstanceMessage();
- message.getKeyValueMap().put(ARG.brokerImplClass, BROKER_IMPL_CLASS);
- message.getKeyValueMap().put(ARG.brokerUrl, BROKER_URL);
- message.getKeyValueMap().put(ARG.entityName, "process1");
- message.getKeyValueMap().put(ARG.entityType, "PROCESS");
- message.getKeyValueMap().put(ARG.feedInstancePaths,
- "/clicks/hour/00/0" + i);
- message.getKeyValueMap().put(ARG.feedNames, "clicks");
- message.getKeyValueMap().put(ARG.logFile, "/logfile");
- message.getKeyValueMap().put(ARG.nominalTime, "2012-10-10-10-10");
- message.getKeyValueMap().put(ARG.operation, "GENERATE");
- message.getKeyValueMap().put(ARG.runId, "0");
- message.getKeyValueMap().put(ARG.timeStamp, "2012-10-10-10-1" + i);
- message.getKeyValueMap().put(ARG.workflowId, "workflow-" + i);
- message.getKeyValueMap().put(ARG.topicName, TOPIC_NAME);
- message.getKeyValueMap().put(ARG.status, "SUCCEEDED");
- message.getKeyValueMap().put(ARG.workflowUser, "falcon");
- return message;
- }
-
- @Test
- public void testSubscriber() throws Exception{
- //Comma separated topics are supported in startup properties
- FalconTopicSubscriber subscriber = new FalconTopicSubscriber(
- BROKER_IMPL_CLASS, "", "", BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME);
- subscriber.startSubscriber();
- sendMessages(TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
- sendMessages(SECONDARY_TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
- Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
- subscriber.closeSubscriber();
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- broker.deleteAllMessages();
- broker.stop();
- }
-}