You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/07/13 11:02:08 UTC
falcon git commit: FALCON-1170 Falcon Native Scheduler - Refactor
existing workflow/coord/bundle builder. Contributed by Pallavi Rao
Repository: falcon
Updated Branches:
refs/heads/master b36a82394 -> 9f69ae271
FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder. Contributed by Pallavi Rao
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9f69ae27
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9f69ae27
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9f69ae27
Branch: refs/heads/master
Commit: 9f69ae27159436721c3fa1fdc401bb0de0cdca80
Parents: b36a823
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jul 13 14:13:38 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jul 13 14:27:03 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/falcon/oozie/OozieBundleBuilder.java | 38 +------
.../falcon/oozie/OozieCoordinatorBuilder.java | 70 ++----------
.../apache/falcon/oozie/OozieEntityBuilder.java | 32 ++++--
.../OozieOrchestrationWorkflowBuilder.java | 108 +++++++++++++++++--
.../feed/FSReplicationWorkflowBuilder.java | 12 +++
.../feed/FeedReplicationCoordinatorBuilder.java | 47 ++------
.../feed/FeedReplicationWorkflowBuilder.java | 44 +++++++-
.../feed/FeedRetentionCoordinatorBuilder.java | 37 ++-----
.../feed/FeedRetentionWorkflowBuilder.java | 42 +++++++-
.../feed/HCatReplicationWorkflowBuilder.java | 8 ++
.../ProcessExecutionCoordinatorBuilder.java | 23 +---
.../ProcessExecutionWorkflowBuilder.java | 31 +++++-
.../java/org/apache/falcon/util/OozieUtils.java | 6 +-
.../feed/OozieFeedWorkflowBuilderTest.java | 64 ++++++-----
.../falcon/oozie/process/AbstractTestBase.java | 55 +++++++---
.../OozieProcessWorkflowBuilderTest.java | 54 +++++-----
17 files changed, 403 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e844a60..88fdfdd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
IMPROVEMENTS
+ FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder(Pallavi Rao via Ajay Yadava)
+
FALCON-1031 Make post processing notifications to user topics optional (Pallavi Rao via Ajay Yadava)
FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu)
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 03063f4..8026967 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -19,12 +19,10 @@
package org.apache.falcon.oozie;
import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.CONFIGURATION;
@@ -83,9 +81,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
final String coordName = coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME);
coord.setName(coordName);
coord.setAppPath(getStoragePath(coordPath));
- Properties appProps = createAppProperties(cluster, buildPath, coordName);
- appProps.putAll(coordProps);
- coord.setConfiguration(getConfig(appProps));
+ coordProps.put(OozieClient.USER_NAME, CurrentUser.getUser());
+ coordProps.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+ coord.setConfiguration(getConfig(coordProps));
bundle.getCoordinator().add(coord);
}
@@ -114,35 +112,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
return conf;
}
- protected Properties createAppProperties(Cluster cluster, Path buildPath,
- String coordName) throws FalconException {
- Properties properties = getEntityProperties(cluster);
- properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
- properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
- properties.setProperty("colo.name", cluster.getColo());
-
- properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
- properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
- properties.setProperty("falcon.libpath",
- ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib");
-
- if (EntityUtil.isTableStorageType(cluster, entity)) {
- Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
- if (tag == Tag.REPLICATION) {
- // todo: kludge send source hcat creds for coord dependency check to pass
- String srcClusterName = EntityUtil.getWorkflowNameSuffix(coordName, entity);
- properties.putAll(getHiveCredentials(ClusterHelper.getCluster(srcClusterName)));
- } else {
- properties.putAll(getHiveCredentials(cluster));
- }
- }
-
- return properties;
- }
-
protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
- OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+ OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
}
//Used by coordinator builders to return multiple coords
@@ -152,7 +124,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException {
try {
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- path.toUri(), ClusterHelper.getConfiguration(cluster));
+ path.toUri(), ClusterHelper.getConfiguration(cluster));
Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller();
@SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement =
unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class);
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 92697b0..85f5330 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -21,23 +21,19 @@ package org.apache.falcon.oozie;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.ExternalId;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.coordinator.CONFIGURATION;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.ObjectFactory;
import org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder;
import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder;
import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
import org.apache.falcon.util.OozieUtils;
-import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClient;
@@ -52,9 +48,7 @@ import java.util.Properties;
public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEntityBuilder<T> {
protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
- protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
- protected static final String MR_QUEUE_NAME = "queueName";
- protected static final String MR_JOB_PRIORITY = "jobPriority";
+
protected static final String IGNORE = "IGNORE";
private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
@@ -111,84 +105,36 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
}
- protected Properties createCoordDefaultConfiguration(Cluster cluster,
- String coordName) throws FalconException {
+ protected Properties createCoordDefaultConfiguration(String coordName) throws FalconException {
Properties props = new Properties();
- props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
- props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
- props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
- props.put("falconDataOperation", getOperation().name());
-
- props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
- getStoragePath(EntityUtil.getLogPath(cluster, entity)));
props.put(OozieClient.EXTERNAL_ID,
new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
"${coord:nominalTime()}").getId());
- props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
-
- addLateDataProperties(props);
- addBrokerProperties(cluster, props);
-
- props.put(MR_QUEUE_NAME, "default");
- props.put(MR_JOB_PRIORITY, "NORMAL");
props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
- //props in entity override the set props.
- props.putAll(getEntityProperties(entity));
return props;
}
- protected abstract WorkflowExecutionContext.EntityOperations getOperation();
-
- private void addLateDataProperties(Properties props) throws FalconException {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
- props.put("shouldRecord", "false");
- } else {
- props.put("shouldRecord", "true");
- }
+ public final Properties build(Cluster cluster, Path buildPath) throws FalconException {
+ throw new IllegalStateException("Not implemented for coordinator!");
}
- private void addBrokerProperties(Cluster cluster, Properties props) {
- props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
- ClusterHelper.getMessageBrokerUrl(cluster));
- props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
- ClusterHelper.getMessageBrokerImplClass(cluster));
-
- String falconBrokerUrl = StartupProperties.get().getProperty(
- "broker.url", "tcp://localhost:61616?daemon=true");
- props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
-
- String falconBrokerImplClass = StartupProperties.get().getProperty(
- "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
- props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+ public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
- String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
- DEFAULT_BROKER_MSG_TTL.toString());
- props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+ protected COORDINATORAPP unmarshal(String template) throws FalconException {
+ return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
}
protected CONFIGURATION getConfig(Properties props) {
CONFIGURATION conf = new CONFIGURATION();
for (Entry<Object, Object> prop : props.entrySet()) {
- Property confProp = new Property();
+ CONFIGURATION.Property confProp = new CONFIGURATION.Property();
confProp.setName((String) prop.getKey());
confProp.setValue((String) prop.getValue());
conf.getProperty().add(confProp);
}
return conf;
}
-
- public final Properties build(Cluster cluster, Path buildPath) throws FalconException {
- throw new IllegalStateException("Not implemented for coordinator!");
- }
-
- public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
-
- protected COORDINATORAPP unmarshal(String template) throws FalconException {
- return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index f00290e..9ca0ac1 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Output;
@@ -36,9 +37,11 @@ import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.service.FalconPathFilter;
import org.apache.falcon.service.SharedLibraryHostingService;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
public static final String ENTITY_PATH = "ENTITY_PATH";
public static final String ENTITY_NAME = "ENTITY_NAME";
+ protected static final String IGNORE = "IGNORE";
private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
@Override
@@ -117,12 +121,12 @@ public abstract class OozieEntityBuilder<T extends Entity> {
}
public static OozieEntityBuilder get(Entity entity) {
- switch(entity.getEntityType()) {
+ switch (entity.getEntityType()) {
case FEED:
return new FeedBundleBuilder((Feed) entity);
case PROCESS:
- return new ProcessBundleBuilder((Process)entity);
+ return new ProcessBundleBuilder((Process) entity);
default:
}
@@ -145,6 +149,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
outPath.toUri(), ClusterHelper.getConfiguration(cluster));
OutputStream out = fs.create(outPath);
+
try {
marshaller.marshal(jaxbElement, out);
} finally {
@@ -158,11 +163,24 @@ public abstract class OozieEntityBuilder<T extends Entity> {
}
}
+ protected Properties createAppProperties(Cluster cluster, String wfName) throws FalconException {
+ Properties properties = getEntityProperties(cluster);
+ properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+ properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
+ properties.setProperty("colo.name", cluster.getColo());
+
+ properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+ properties.setProperty("falcon.libpath",
+ ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib");
+
+ return properties;
+ }
+
protected Properties getHiveCredentials(Cluster cluster) {
String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
if (metaStoreUrl == null) {
throw new IllegalStateException(
- "Registry interface is not defined in cluster: " + cluster.getName());
+ "Registry interface is not defined in cluster: " + cluster.getName());
}
Properties hiveCredentials = new Properties();
@@ -173,7 +191,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
if (isSecurityEnabled) {
String principal = ClusterHelper
- .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+ .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
hiveCredentials.put("hcat.metastore.principal", principal);
@@ -236,9 +254,9 @@ public abstract class OozieEntityBuilder<T extends Entity> {
//pig and java actions require partition expression as "key1=val1, key2=val2"
props.put(prefix + "_partitions_pig",
- "${coord:dataOutPartitions('" + output.getName() + "')}");
+ "${coord:dataOutPartitions('" + output.getName() + "')}");
props.put(prefix + "_partitions_java",
- "${coord:dataOutPartitions('" + output.getName() + "')}");
+ "${coord:dataOutPartitions('" + output.getName() + "')}");
//hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values)
//there is no direct EL expression in oozie
@@ -246,7 +264,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
for (String key : tableStorage.getDatedPartitionKeys()) {
StringBuilder expr = new StringBuilder();
expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key)
- .append("')}");
+ .append("')}");
props.put(prefix + "_dated_partition_value_" + key, expr.toString());
partitions.add(key + "='" + expr + "'");
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 49f9e07..f8220ec 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -37,6 +37,7 @@ import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.CREDENTIAL;
import org.apache.falcon.oozie.workflow.CREDENTIALS;
import org.apache.falcon.oozie.workflow.END;
@@ -46,17 +47,23 @@ import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import javax.xml.bind.JAXBElement;
+import javax.xml.namespace.QName;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -79,9 +86,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
- new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
+ new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
- private final LifeCycle lifecycle;
+ private LifeCycle lifecycle;
+
+ protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+ protected static final String MR_QUEUE_NAME = "queueName";
+ protected static final String MR_JOB_PRIORITY = "jobPriority";
public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
super(entity);
@@ -96,6 +107,10 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return lifecycle.getTag();
}
+ public OozieOrchestrationWorkflowBuilder(T entity) {
+ super(entity);
+ }
+
public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
throws FalconException {
switch (entity.getEntityType()) {
@@ -115,7 +130,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
default:
throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
- + ", lifecycle " + lifecycle);
+ + ", lifecycle " + lifecycle);
}
case PROCESS:
@@ -192,7 +207,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
- OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+ OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+ }
+
+ protected Path marshal(Cluster cluster, WORKFLOWAPP workflowapp, CONFIGURATION config, Path outPath)
+ throws FalconException {
+ QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory()
+ .createWorkflowApp(workflowapp).getName();
+ JAXBElement<CONFIGURATION> configJaxbElement =
+ new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()),
+ CONFIGURATION.class, config);
+
+ return marshal(cluster, configJaxbElement, OozieUtils.CONFIG_JAXB_CONTEXT,
+ new Path(outPath, "config-default.xml"));
}
protected WORKFLOWAPP unmarshal(String template) throws FalconException {
@@ -212,13 +239,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- ClusterHelper.getConfiguration(cluster));
+ ClusterHelper.getConfiguration(cluster));
try {
addExtensionJars(fs, new Path(libext), wf);
addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
if (tag != null) {
addExtensionJars(fs, new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()),
- wf);
+ wf);
}
} catch (IOException e) {
throw new FalconException(e);
@@ -316,7 +343,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
* @param cluster cluster entity
*/
protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName,
- Set<String> actions) {
+ Set<String> actions) {
addHCatalogCredentials(workflowApp, cluster, credentialName);
// add credential to each action
@@ -349,7 +376,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
credential.getProperty().add(createProperty("hcat.metastore.principal",
- ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
+ ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
return credential;
}
@@ -366,4 +393,69 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
}
+
+ public Properties createDefaultConfiguration(Cluster cluster) throws FalconException {
+ Properties props = new Properties();
+ props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+ props.put("falconDataOperation", getOperation().name());
+
+ props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+ getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+ props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+ addLateDataProperties(props);
+ addBrokerProperties(cluster, props);
+
+ props.put(MR_QUEUE_NAME, "default");
+ props.put(MR_JOB_PRIORITY, "NORMAL");
+
+ //props in entity override the set props.
+ props.putAll(getEntityProperties(entity));
+ props.putAll(createAppProperties(cluster, entity.getName()));
+ return props;
+ }
+
+ private void addLateDataProperties(Properties props) throws FalconException {
+ if (EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+ props.put("shouldRecord", "false");
+ } else {
+ props.put("shouldRecord", "true");
+ }
+ }
+
+ private void addBrokerProperties(Cluster cluster, Properties props) {
+ props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+ ClusterHelper.getMessageBrokerUrl(cluster));
+ props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+ ClusterHelper.getMessageBrokerImplClass(cluster));
+
+ String falconBrokerUrl = StartupProperties.get().getProperty(
+ "broker.url", "tcp://localhost:61616?daemon=true");
+ props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+ String falconBrokerImplClass = StartupProperties.get().getProperty(
+ "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+ String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+ DEFAULT_BROKER_MSG_TTL.toString());
+ props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+ }
+
+ protected abstract WorkflowExecutionContext.EntityOperations getOperation();
+
+ protected CONFIGURATION getConfig(Properties props) {
+ CONFIGURATION conf = new CONFIGURATION();
+ for (Map.Entry<Object, Object> prop : props.entrySet()) {
+ CONFIGURATION.Property confProp = new CONFIGURATION.Property();
+ confProp.setName((String) prop.getKey());
+ confProp.setValue((String) prop.getValue());
+ conf.getProperty().add(confProp);
+ }
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 1d97204..0381e59 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import java.util.Arrays;
+import java.util.Properties;
/**
* Builds replication workflow for filesystem based feed.
@@ -71,4 +72,15 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
decorateWorkflow(workflow, wfName, start);
return workflow;
}
+
+ protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+ Properties props = super.getWorkflowProperties(feed);
+ if (entity.getAvailabilityFlag() == null) {
+ props.put("availabilityFlag", "NA");
+ } else {
+ props.put("availabilityFlag", entity.getAvailabilityFlag());
+ }
+
+ return props;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index de6f373..f5cc2c3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -45,7 +45,6 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
@@ -73,8 +72,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
private static final String PARALLEL = "parallel";
private static final String TIMEOUT = "timeout";
- private static final String MR_MAX_MAPS = "maxMaps";
- private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
private static final String ORDER = "order";
public FeedReplicationCoordinatorBuilder(Feed entity) {
@@ -101,18 +98,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
return null;
}
- @Override
- protected WorkflowExecutionContext.EntityOperations getOperation() {
- return WorkflowExecutionContext.EntityOperations.REPLICATE;
- }
-
private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
-
- // Different workflow for each source since hive credentials vary for each cluster
- OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
- Tag.REPLICATION);
- Properties wfProps = builder.build(trgCluster, buildPath);
-
long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
Date sourceEndDate = getEndDate(srcCluster);
@@ -127,6 +113,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
return null;
}
+ // Different workflow for each source since hive credentials vary for each cluster
+ OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
+ Tag.REPLICATION);
+ Properties wfProps = builder.build(trgCluster, buildPath);
+
COORDINATORAPP coord = unmarshal(REPLICATION_COORD_TEMPLATE);
String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
@@ -155,24 +146,18 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
}
private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
- String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
+ String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
ACTION action = new ACTION();
WORKFLOW workflow = new WORKFLOW();
workflow.setAppPath(getStoragePath(buildPath));
- Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+ Properties props = createCoordDefaultConfiguration(wfName);
// Override CLUSTER_NAME property to include both source and target cluster pair
String clusterProperty = trgCluster.getName()
+ WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
- if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
- props.put(MR_MAX_MAPS, getDefaultMaxMaps());
- }
- if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
- props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
- }
// the storage type is uniform across source and target feeds for replication
props.put("falconFeedStorageType", sourceStorage.getType().name());
@@ -183,12 +168,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
instancePaths = pathsWithPartitions;
propagateFileSystemCopyProperties(pathsWithPartitions, props);
-
- if (entity.getAvailabilityFlag() == null) {
- props.put("availabilityFlag", "NA");
- } else {
- props.put("availabilityFlag", entity.getAvailabilityFlag());
- }
} else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
instancePaths = "${coord:dataIn('input')}";
final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
@@ -197,25 +176,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
setupHiveConfiguration(srcCluster, trgCluster, buildPath);
- props.put("availabilityFlag", "NA");
}
propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
- props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
-
+ // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+ props.putAll(getEntityProperties(entity));
workflow.setConfiguration(getConfig(props));
action.setWorkflow(workflow);
return action;
}
- private String getDefaultMaxMaps() {
- return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
- }
-
- private String getDefaultMapBandwidth() {
- return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
- }
private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException {
String srcPart = FeedHelper.normalizePartitionExpression(
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index aa936ad..fb41b96 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -23,6 +23,7 @@ import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -31,6 +32,8 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.Properties;
@@ -41,6 +44,8 @@ import java.util.Properties;
public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
protected static final String REPLICATION_ACTION_TEMPLATE = "/action/feed/replication-action.xml";
protected static final String REPLICATION_ACTION_NAME = "replication";
+ private static final String MR_MAX_MAPS = "maxMaps";
+ private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
public FeedReplicationWorkflowBuilder(Feed entity) {
super(entity, LifeCycle.REPLICATION);
@@ -56,8 +61,32 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
marshal(cluster, workflow, buildPath);
- return getProperties(buildPath, wfName);
+ Properties props = getProperties(buildPath, wfName);
+ props.putAll(createDefaultConfiguration(cluster));
+ if (EntityUtil.isTableStorageType(cluster, entity)) {
+ // todo: kludge send source hcat creds for coord dependency check to pass
+ props.putAll(getHiveCredentials(srcCluster));
+ props.putAll(getHiveCredentials(cluster));
+ }
+ props.putAll(getWorkflowProperties(entity));
+ props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+ // Write out the config to config-default.xml
+ marshal(cluster, workflow, getConfig(props), buildPath);
+ return props;
}
+
+ protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+ Properties props = FeedHelper.getFeedProperties(feed);
+ if (props.getProperty(MR_MAX_MAPS) == null) { // set default if user has not overridden
+ props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+ }
+ if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+ props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+ }
+
+ return props;
+ }
+
protected ACTION addHDFSServersConfig(ACTION action, Cluster sourceCluster, Cluster targetCluster) {
if (isSecurityEnabled) {
// this is to ensure that the delegation tokens are checked out for both clusters
@@ -70,4 +99,17 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
return action;
}
protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
+
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.REPLICATE;
+ }
+
+ private String getDefaultMaxMaps() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+ }
+
+ private String getDefaultMapBandwidth() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index c896d5a..ce9ef9a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -22,7 +22,6 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -33,8 +32,6 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.Arrays;
@@ -73,32 +70,15 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
}
Path coordPath = getBuildPath(buildPath);
- Properties props = createCoordDefaultConfiguration(cluster, coordName);
- props.put("timeZone", entity.getTimezone().getID());
- props.put("frequency", entity.getFrequency().getTimeUnit().name());
-
- final Storage storage = FeedHelper.createStorage(cluster, entity);
- props.put("falconFeedStorageType", storage.getType().name());
-
- String feedDataPath = storage.getUriTemplate();
- props.put("feedDataPath",
- feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
- props.put("limit", feedCluster.getRetention().getLimit().toString());
-
- props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
- props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
-
- props.put("falconInputFeeds", entity.getName());
- props.put("falconInPaths", IGNORE);
-
- props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+ Properties props = createCoordDefaultConfiguration(coordName);
WORKFLOW workflow = new WORKFLOW();
- Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
+ Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
coordPath);
- workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
- props.putAll(wfProp);
+ workflow.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ props.putAll(getProperties(coordPath, coordName));
+ // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+ props.putAll(getEntityProperties(entity));
workflow.setConfiguration(getConfig(props));
ACTION action = new ACTION();
action.setWorkflow(workflow);
@@ -108,9 +88,4 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
Path marshalPath = marshal(cluster, coord, coordPath);
return Arrays.asList(getProperties(marshalPath, coordName));
}
-
- @Override
- protected WorkflowExecutionContext.EntityOperations getOperation() {
- return WorkflowExecutionContext.EntityOperations.DELETE;
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index 51e081f..b56f0dd 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -22,11 +22,15 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.Properties;
@@ -64,20 +68,47 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
+ Properties props = getProperties(buildPath, wfName);
+ props.putAll(getWorkflowProperties(cluster));
+ props.putAll(createDefaultConfiguration(cluster));
+ props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+
if (EntityUtil.isTableStorageType(cluster, entity)) {
setupHiveCredentials(cluster, buildPath, workflow);
+ // todo: kludge send source hcat creds for coord dependency check to pass
+ props.putAll(getHiveCredentials(cluster));
}
marshal(cluster, workflow, buildPath);
- Properties props = getProperties(buildPath, wfName);
- props.putAll(getWorkflowProperties());
+
+ // Write out the config to config-default.xml
+ marshal(cluster, workflow, getConfig(props), buildPath);
return props;
}
- private Properties getWorkflowProperties() {
+ private Properties getWorkflowProperties(Cluster cluster) throws FalconException {
Properties props = new Properties();
props.setProperty("srcClusterName", "NA");
props.setProperty("availabilityFlag", "NA");
+
+ props.put("timeZone", entity.getTimezone().getID());
+ props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+ final Storage storage = FeedHelper.createStorage(cluster, entity);
+ props.put("falconFeedStorageType", storage.getType().name());
+
+ String feedDataPath = storage.getUriTemplate();
+ props.put("feedDataPath",
+ feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+ props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
+
+ props.put("falconInputFeeds", entity.getName());
+ props.put("falconInPaths", IGNORE);
return props;
}
@@ -110,4 +141,9 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
}
}
}
+
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.DELETE;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 72bbca4..347ddaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import java.util.Arrays;
+import java.util.Properties;
/**
* Builds replication workflow for hcat based feed.
@@ -135,4 +136,11 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
}
}
}
+
+ protected Properties getWorkflowProperties(Feed feed) throws FalconException {
+ Properties props = super.getWorkflowProperties(feed);
+ props.put("availabilityFlag", "NA");
+
+ return props;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 60f9fe1..d6d42e1 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -36,7 +36,6 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
@@ -51,7 +50,6 @@ import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
@@ -82,37 +80,31 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
coord.setControls(controls);
// Configuration
- Properties props = createCoordDefaultConfiguration(cluster, coordName);
+ Properties props = createCoordDefaultConfiguration(coordName);
initializeInputPaths(cluster, coord, props); // inputs
initializeOutputPaths(cluster, coord, props); // outputs
- Workflow processWorkflow = entity.getWorkflow();
- propagateUserWorkflowProperties(processWorkflow, props);
-
// create parent wf
Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster,
coordPath);
WORKFLOW wf = new WORKFLOW();
wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
- props.putAll(wfProps);
+ // Add the custom properties set in feed. Else, dryrun won't catch any missing props.
+ props.putAll(getEntityProperties(entity));
wf.setConfiguration(getConfig(props));
// set coord action to parent wf
org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
action.setWorkflow(wf);
+
coord.setAction(action);
Path marshalPath = marshal(cluster, coord, coordPath);
return Arrays.asList(getProperties(marshalPath, coordName));
}
- @Override
- protected WorkflowExecutionContext.EntityOperations getOperation() {
- return WorkflowExecutionContext.EntityOperations.GENERATE;
- }
-
private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
coord.setName(coordName);
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
@@ -351,12 +343,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
- private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
- props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
- processWorkflow.getName(), entity.getName()));
- props.put("userWorkflowVersion", processWorkflow.getVersion());
- props.put("userWorkflowEngine", processWorkflow.getEngine().value());
- }
+
protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
String prefix = "falcon_" + input.getName();
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 8b18ecc..ac436ca 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -33,15 +34,18 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.oozie.client.OozieClient;
import java.io.IOException;
import java.util.ArrayList;
@@ -102,8 +106,21 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
}
marshal(cluster, wfApp, buildPath);
- Properties props = getProperties(buildPath, wfName);
+ Properties props = createDefaultConfiguration(cluster);
+ props.putAll(getProperties(buildPath, wfName));
props.putAll(getWorkflowProperties());
+ props.setProperty(OozieClient.APP_PATH, buildPath.toString());
+
+ //Add libpath
+ Path libPath = new Path(buildPath, "lib");
+ copySharedLibs(cluster, libPath);
+ props.put(OozieClient.LIBPATH, libPath.toString());
+
+ Workflow processWorkflow = ((Process)(entity)).getWorkflow();
+ propagateUserWorkflowProperties(processWorkflow, props);
+
+ // Write out the config to config-default.xml
+ marshal(cluster, wfApp, getConfig(props), buildPath);
return props;
}
@@ -251,4 +268,16 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
}
}
+
+ private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
+ props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+ processWorkflow.getName(), entity.getName()));
+ props.put("userWorkflowVersion", processWorkflow.getVersion());
+ props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+ }
+
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.GENERATE;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 0ae229c..149a7e6 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.hive.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.hadoop.conf.Configuration;
import org.apache.xerces.dom.ElementNSImpl;
@@ -43,6 +44,7 @@ public final class OozieUtils {
public static final JAXBContext ACTION_JAXB_CONTEXT;
public static final JAXBContext COORD_JAXB_CONTEXT;
public static final JAXBContext BUNDLE_JAXB_CONTEXT;
+ public static final JAXBContext CONFIG_JAXB_CONTEXT;
protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
static {
@@ -51,6 +53,7 @@ public final class OozieUtils {
ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+ CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class);
HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
} catch (JAXBException e) {
@@ -72,7 +75,7 @@ public final class OozieUtils {
}
@SuppressWarnings("unchecked")
- public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
+ public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
try {
Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
@@ -94,5 +97,4 @@ public final class OozieUtils {
throw new RuntimeException("Unable to marshall hive action.", e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index b223447..2f7787d 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -201,9 +201,12 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- verifyEntityProperties(feed, trgCluster, srcCluster,
+ verifyEntityProperties(trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
- verifyBrokerProperties(trgCluster, props);
+
+ HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+ verifyEntityProperties(feed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+ verifyBrokerProperties(trgCluster, wfProps);
// verify the replication param that feed replicator depends on
String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
@@ -226,15 +229,15 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
// verify workflow params
- Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
- Assert.assertEquals(props.get("userWorkflowVersion"), "0.6");
- Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+ Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy");
+ Assert.assertEquals(wfProps.get("userWorkflowVersion"), "0.6");
+ Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
// verify default params
- Assert.assertEquals(props.get("queueName"), "default");
- Assert.assertEquals(props.get("jobPriority"), "NORMAL");
- Assert.assertEquals(props.get("maxMaps"), "5");
- Assert.assertEquals(props.get("mapBandwidth"), "100");
+ Assert.assertEquals(wfProps.get("queueName"), "default");
+ Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL");
+ Assert.assertEquals(wfProps.get("maxMaps"), "5");
+ Assert.assertEquals(wfProps.get("mapBandwidth"), "100");
assertLibExtensions(coord, "replication");
WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
@@ -340,12 +343,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
- Assert.assertEquals(props.get("maxMaps"), "33");
- Assert.assertEquals(props.get("mapBandwidth"), "2");
- verifyEntityProperties(aFeed, aCluster, srcCluster,
+ verifyEntityProperties(aCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
- verifyBrokerProperties(trgCluster, props);
+
+ HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+ verifyEntityProperties(aFeed, aCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+ verifyBrokerProperties(aCluster, wfProps);
+
+ Assert.assertEquals(wfProps.get("maxMaps"), "33");
+ Assert.assertEquals(wfProps.get("mapBandwidth"), "2");
}
public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
@@ -484,9 +491,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
- verifyEntityProperties(tableFeed, trgCluster, srcCluster,
- WorkflowExecutionContext.EntityOperations.REPLICATE, props);
- verifyBrokerProperties(trgCluster, props);
+ HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+ verifyEntityProperties(tableFeed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
+ verifyBrokerProperties(trgCluster, wfProps);
}
private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -592,10 +599,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- String feedDataPath = props.get("feedDataPath");
- String storageType = props.get("falconFeedStorageType");
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+ String feedDataPath = wfProps.get("feedDataPath");
+ String storageType = wfProps.get("falconFeedStorageType");
// verify the param that feed evictor depends on
+
Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
final Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -609,8 +619,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
}
// verify the post processing params
- Assert.assertEquals(props.get("feedNames"), feed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+ Assert.assertEquals(wfProps.get("feedNames"), feed.getName());
+ Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
@@ -651,8 +661,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- String feedDataPath = props.get("feedDataPath");
- String storageType = props.get("falconFeedStorageType");
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+ String feedDataPath = wfProps.get("feedDataPath");
+ String storageType = wfProps.get("falconFeedStorageType");
// verify the param that feed evictor depends on
Assert.assertEquals(storageType, Storage.TYPE.TABLE.name());
@@ -668,13 +680,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
}
// verify the post processing params
- Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+ Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName());
+ Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
assertWorkflowRetries(coord);
- verifyBrokerProperties(srcCluster, props);
+ verifyBrokerProperties(srcCluster, wfProps);
verifyEntityProperties(tableFeed, trgCluster,
- WorkflowExecutionContext.EntityOperations.DELETE, props);
+ WorkflowExecutionContext.EntityOperations.DELETE, wfProps);
Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 6488682..ce76594 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -31,10 +31,11 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -46,6 +47,9 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
@@ -116,7 +120,7 @@ public class AbstractTestBase {
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
unmarshaller.setSchema(schema);
JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
+ new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class);
return jaxbBundle.getValue();
}
@@ -128,7 +132,7 @@ public class AbstractTestBase {
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
unmarshaller.setSchema(schema);
JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
+ new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
return jaxbBundle.getValue();
}
@@ -153,7 +157,7 @@ public class AbstractTestBase {
}
protected void assertLibExtensions(FileSystem fs, COORDINATORAPP coord, EntityType type,
- String lifecycle) throws Exception {
+ String lifecycle) throws Exception {
WORKFLOWAPP wf = getWorkflowapp(fs, coord);
List<Object> actions = wf.getDecisionOrForkOrJoin();
String lifeCyclePath = lifecycle == null ? "" : "/" + lifecycle;
@@ -172,7 +176,7 @@ public class AbstractTestBase {
}
if (files != null) {
Assert.assertTrue(files.get(files.size() - 1).endsWith(
- "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
+ "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar"));
}
}
}
@@ -209,36 +213,57 @@ public class AbstractTestBase {
protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) {
HashMap<String, String> props = new HashMap<String, String>();
- for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ for (org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+ : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
return props;
}
- protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
+ protected HashMap<String, String> getWorkflowProperties(FileSystem fs, COORDINATORAPP coord)
+ throws JAXBException, IOException, XMLStreamException {
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ StreamSource xml = new StreamSource(fs.open(new Path(wfPath + "/config-default.xml")));
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ XMLStreamReader xsr = xif.createXMLStreamReader(xml);
+ JAXBContext jaxbContext = OozieUtils.CONFIG_JAXB_CONTEXT;
+ CONFIGURATION jaxbConfig = ((JAXBElement<CONFIGURATION>) jaxbContext.createUnmarshaller().
+ unmarshal(xsr, CONFIGURATION.class)).getValue();
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (CONFIGURATION.Property prop : jaxbConfig.getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ return props;
+ }
+
+ protected void verifyEntityProperties(Cluster cluster, Cluster srcCluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
- Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
- entity.getName());
- Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
- entity.getEntityType().name());
if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
} else {
Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
}
- Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
- Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}
protected void verifyEntityProperties(Entity entity, Cluster cluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
- verifyEntityProperties(entity, cluster, null, operation, props);
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
+ entity.getName());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
+ entity.getEntityType().name());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
+ Assert.assertEquals(props.get("falconDataOperation"), operation.name());
+ Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName()));
+ Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName()));
+ Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName()));
}
- private String getLogPath(Cluster cluster, Entity entity) {
+ protected String getLogPath(Cluster cluster, Entity entity) {
Path logPath = EntityUtil.getLogPath(cluster, entity);
return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 4e5c3f0..3aaf304 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -188,14 +188,15 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
HashMap<String, String> props = getCoordProperties(coord);
- assertEquals(props.get("mapred.job.priority"), "LOW");
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+ assertEquals(wfProps.get("mapred.job.priority"), "LOW");
List<Input> inputs = process.getInputs().getInputs();
assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs
.get(1).getName());
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
assertLibExtensions(fs, coord, EntityType.PROCESS, null);
}
@@ -285,10 +286,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -347,11 +349,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -401,11 +403,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -451,10 +453,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -550,9 +552,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
+
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -684,9 +688,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
String[] expected = {
WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
@@ -694,9 +699,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
WorkflowExecutionArgs.INPUT_NAMES.getName(),
- WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
- WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
- WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
};
for (String property : expected) {
@@ -726,9 +728,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE");
@@ -756,9 +759,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
+ HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
- WorkflowExecutionContext.EntityOperations.GENERATE, props);
- verifyBrokerProperties(cluster, props);
+ WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+ verifyBrokerProperties(cluster, wfProps);
Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions");
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");