You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/10 08:57:35 UTC
[6/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
new file mode 100644
index 0000000..c87bc86
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds oozie coordinator for process.
+ */
+public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<Process> {
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+ public ProcessExecutionCoordinatorBuilder(Process entity) {
+ super(entity, Tag.DEFAULT);
+ }
+
+ @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+ String coordName = getEntityName();
+ Path coordPath = getBuildPath(buildPath);
+ copySharedLibs(cluster, new Path(coordPath, "lib"));
+
+ COORDINATORAPP coord = new COORDINATORAPP();
+ // coord attributes
+ initializeCoordAttributes(cluster, coord, coordName);
+
+ CONTROLS controls = initializeControls(); // controls
+ coord.setControls(controls);
+
+ // Configuration
+ Properties props = createCoordDefaultConfiguration(cluster, coordName);
+
+ initializeInputPaths(cluster, coord, props); // inputs
+ initializeOutputPaths(cluster, coord, props); // outputs
+
+ Workflow processWorkflow = entity.getWorkflow();
+ propagateUserWorkflowProperties(processWorkflow, props);
+
+ // create parent wf
+ Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, Tag.DEFAULT).build(cluster, coordPath);
+
+ WORKFLOW wf = new WORKFLOW();
+ wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ wf.setConfiguration(getConfig(props));
+
+ // set coord action to parent wf
+ org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+ action.setWorkflow(wf);
+ coord.setAction(action);
+
+ marshal(cluster, coord, coordPath);
+ return Arrays.asList(getProperties(coordPath, coordName));
+ }
+
+ private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
+ coord.setName(coordName);
+ org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
+ cluster.getName());
+ coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
+ coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
+ coord.setTimezone(entity.getTimezone().getID());
+ coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+ }
+
+ private CONTROLS initializeControls()
+ throws FalconException {
+ CONTROLS controls = new CONTROLS();
+ controls.setConcurrency(String.valueOf(entity.getParallel()));
+ controls.setExecution(entity.getOrder().name());
+
+ Frequency timeout = entity.getTimeout();
+ long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
+ long timeoutInMillis;
+ if (timeout != null) {
+ timeoutInMillis = ExpressionHelper.get().
+ evaluate(entity.getTimeout().toString(), Long.class);
+ } else {
+ timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+ }
+ controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
+ if (timeoutInMillis / frequencyInMillis * 2 > 0) {
+ controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+ }
+
+ return controls;
+ }
+
+ private void initializeInputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
+ if (entity.getInputs() == null) {
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", IGNORE);
+ return;
+ }
+
+ List<String> inputFeeds = new ArrayList<String>();
+ List<String> inputPaths = new ArrayList<String>();
+ List<String> inputFeedStorageTypes = new ArrayList<String>();
+ for (Input input : entity.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ if (!input.isOptional()) {
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+ if (coord.getInputEvents() == null) {
+ coord.setInputEvents(new INPUTEVENTS());
+ }
+
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+ DATAIN datain = createDataIn(input);
+ coord.getInputEvents().getDataIn().add(datain);
+ }
+
+ String inputExpr = null;
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+ props.put(input.getName(), inputExpr);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+ propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+ }
+
+ inputFeeds.add(feed.getName());
+ inputPaths.add(inputExpr);
+ inputFeedStorageTypes.add(storage.getType().name());
+ }
+
+ propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+ }
+
+ private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+ List<String> inputFeedStorageTypes, Properties props) {
+ // populate late data handler - should-record action
+ props.put("falconInputFeeds", StringUtils.join(inputFeeds, '#'));
+ props.put("falconInPaths", StringUtils.join(inputPaths, '#'));
+
+ // storage type for each corresponding feed sent as a param to LateDataHandler
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", StringUtils.join(inputFeedStorageTypes, '#'));
+ }
+
+ private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+ String datasetName, LocationType locationType) throws FalconException {
+ SYNCDATASET syncdataset = new SYNCDATASET();
+ syncdataset.setName(datasetName);
+ syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+ String uriTemplate = storage.getUriTemplate(locationType);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ syncdataset.setUriTemplate(uriTemplate);
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+ syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+ syncdataset.setTimezone(feed.getTimezone().getID());
+
+ if (feed.getAvailabilityFlag() == null) {
+ syncdataset.setDoneFlag("");
+ } else {
+ syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+ }
+
+ return syncdataset;
+ }
+
+ private DATAIN createDataIn(Input input) {
+ DATAIN datain = new DATAIN();
+ datain.setName(input.getName());
+ datain.setDataset(input.getName());
+ datain.setStartInstance(getELExpression(input.getStart()));
+ datain.setEndInstance(getELExpression(input.getEnd()));
+ return datain;
+ }
+
+ private String getELExpression(String expr) {
+ if (expr != null) {
+ expr = "${" + expr + "}";
+ }
+ return expr;
+ }
+
+ private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
+ if (entity.getOutputs() == null) {
+ props.put(ARG.feedNames.getPropName(), "NONE");
+ props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+ return;
+ }
+
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+
+ if (coord.getOutputEvents() == null) {
+ coord.setOutputEvents(new OUTPUTEVENTS());
+ }
+
+ List<String> outputFeeds = new ArrayList<String>();
+ List<String> outputPaths = new ArrayList<String>();
+ for (Output output : entity.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+ DATAOUT dataout = createDataOut(output);
+ coord.getOutputEvents().getDataOut().add(dataout);
+
+ String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+ outputFeeds.add(feed.getName());
+ outputPaths.add(outputExpr);
+
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ props.put(output.getName(), outputExpr);
+
+ propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+ }
+ }
+
+ // Output feed name and path for parent workflow
+ props.put(ARG.feedNames.getPropName(), StringUtils.join(outputFeeds, ','));
+ props.put(ARG.feedInstancePaths.getPropName(), StringUtils.join(outputPaths, ','));
+ }
+
+ private DATAOUT createDataOut(Output output) {
+ DATAOUT dataout = new DATAOUT();
+ dataout.setName(output.getName());
+ dataout.setDataset(output.getName());
+ dataout.setInstance(getELExpression(output.getInstance()));
+ return dataout;
+ }
+
+ private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+ Storage storage, Properties props) throws FalconException {
+ // stats and meta paths
+ createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+ }
+
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+ COORDINATORAPP coord, Properties props, Storage storage) throws FalconException {
+ String name = output.getName();
+ String type = locType.name().toLowerCase();
+
+ SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
+ DATAOUT dataout = new DATAOUT();
+ dataout.setName(name + type);
+ dataout.setDataset(name + type);
+ dataout.setInstance(getELExpression(output.getInstance()));
+
+ OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+ if (outputEvents == null) {
+ outputEvents = new OUTPUTEVENTS();
+ coord.setOutputEvents(outputEvents);
+ }
+ outputEvents.getDataOut().add(dataout);
+
+ String outputExpr = "${coord:dataOut('" + name + type + "')}";
+ props.put(name + "." + type, outputExpr);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
+ props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+ processWorkflow.getName(), entity.getName()));
+ props.put("userWorkflowVersion", processWorkflow.getVersion());
+ props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
new file mode 100644
index 0000000..0d9abdb
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Base class for building orchestration workflow for process.
+ */
+public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Process> {
+ private static final String DEFAULT_WF_TEMPLATE = "/workflow/process-parent-workflow.xml";
+ private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
+ Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
+
+ protected ProcessExecutionWorkflowBuilder(Process entity) {
+ super(entity, Tag.DEFAULT);
+ }
+
+ @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+ WORKFLOWAPP wfApp = getWorkflow(DEFAULT_WF_TEMPLATE);
+ String wfName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
+ wfApp.setName(wfName);
+
+ addLibExtensionsToWorkflow(cluster, wfApp, null);
+
+ final boolean isTableStorageType = isTableStorageType(cluster);
+ if (isTableStorageType) {
+ setupHiveCredentials(cluster, buildPath, wfApp);
+ }
+
+ for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof ACTION)) {
+ continue;
+ }
+
+ ACTION action = (ACTION) object;
+ String actionName = action.getName();
+ if (FALCON_ACTIONS.contains(actionName)) {
+ decorateWithOozieRetries(action);
+ if (isTableStorageType && actionName.equals("recordsize")) {
+ // adds hive-site.xml in actions classpath
+ action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+ }
+ }
+
+ decorateAction(action, cluster, buildPath);
+ }
+
+ //Create parent workflow
+ marshal(cluster, wfApp, buildPath);
+ return getProperties(buildPath, wfName);
+ }
+
+ protected abstract void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException;
+
+ private void setupHiveCredentials(Cluster cluster, Path buildPath, WORKFLOWAPP wfApp) throws FalconException {
+ // create hive-site.xml file so actions can use it in the classpath
+ createHiveConfiguration(cluster, buildPath, ""); // DO NOT ADD PREFIX!!!
+
+ if (isSecurityEnabled) {
+ // add hcatalog credentials for secure mode and add a reference to each action
+ addHCatalogCredentials(wfApp, cluster, HIVE_CREDENTIAL_NAME, FALCON_PROCESS_HIVE_ACTIONS);
+ }
+ }
+
+ protected void addInputFeedsAsParams(List<String> paramList, Cluster cluster) throws FalconException {
+ if (entity.getInputs() == null) {
+ return;
+ }
+
+ for (Input input : entity.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ final String inputName = input.getName();
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+ Properties props = new Properties();
+ propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+ for (Object key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+
+ paramList.add(paramName + "_filter=${wf:conf('"
+ + paramName + "_partition_filter_" + entity.getWorkflow().getEngine().name().toLowerCase() + "')}");
+ }
+ }
+ }
+
+ protected void addOutputFeedsAsParams(List<String> paramList, Cluster cluster) throws FalconException {
+ if (entity.getOutputs() == null) {
+ return;
+ }
+
+ for (Output output : entity.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ final String outputName = output.getName(); // no prefix for backwards compatibility
+ paramList.add(outputName + "=${" + outputName + "}");
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ Properties props = new Properties();
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+ for (Object key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+
+ final String paramName = "falcon_" + output.getName(); // prefix 'falcon' for new params
+ paramList.add(paramName + "_partitions=${wf:conf('"
+ + paramName + "_partitions_" + entity.getWorkflow().getEngine().name().toLowerCase() + "')}");
+ }
+ }
+ }
+
+ protected void propagateEntityProperties(CONFIGURATION conf, List<String> paramList) {
+ Properties entityProperties = getEntityProperties(entity);
+
+ // Propagate user defined properties to job configuration
+ final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration = conf.getProperty();
+
+ // Propagate user defined properties to pig script as macros
+ // passed as parameters -p name=value that can be accessed as $name
+ for (Entry<Object, Object> entry: entityProperties.entrySet()) {
+ org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+ new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+ configProperty.setName((String) entry.getKey());
+ configProperty.setValue((String) entry.getValue());
+ configuration.add(configProperty);
+
+ paramList.add(entry.getKey() + "=" + entry.getValue());
+ }
+ }
+
+ protected List<String> getPrepareDeleteOutputPathList() throws FalconException {
+ final List<String> deleteList = new ArrayList<String>();
+ if (entity.getOutputs() == null) {
+ return deleteList;
+ }
+
+ for (Output output : entity.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+ if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+ continue; // prepare delete only applies to FileSystem storage
+ }
+
+ deleteList.add("${wf:conf('" + output.getName() + "')}");
+ }
+
+ return deleteList;
+ }
+
+ protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+ Path libPath) throws FalconException {
+ if (libPath == null) {
+ return;
+ }
+
+ try {
+ final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+ if (fs.isFile(libPath)) { // File, not a Dir
+ archiveList.add(libPath.toString());
+ return;
+ }
+
+ // lib path is a directory, add each file under the lib dir to archive
+ final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ try {
+ return fs.isFile(path) && path.getName().endsWith(".jar");
+ } catch (IOException ignore) {
+ return false;
+ }
+ }
+ });
+
+ for (FileStatus fileStatus : fileStatuses) {
+ archiveList.add(fileStatus.getPath().toString());
+ }
+ } catch (IOException e) {
+ throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
deleted file mode 100644
index a0406e6..0000000
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.ObjectFactory;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.CREDENTIAL;
-import org.apache.falcon.oozie.workflow.CREDENTIALS;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.service.FalconPathFilter;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.util.OozieUtils;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.oozie.client.OozieClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Base workflow builder for falcon entities.
- * @param <T>
- */
-public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBuilder<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(OozieWorkflowBuilder.class);
- protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
-
- protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
- protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
-
- protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
- protected static final String MR_QUEUE_NAME = "queueName";
- protected static final String MR_JOB_PRIORITY = "jobPriority";
-
- protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
-
- public static final String METASTOREURIS = "hive.metastore.uris";
- public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
- public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled";
-
- protected static final String IGNORE = "IGNORE";
-
- public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
- Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
-
- protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith("falcon");
- }
-
- @Override
- public String getJarName(Path path) {
- String name = path.getName();
- if (name.endsWith(".jar")) {
- name = name.substring(0, name.indexOf(".jar"));
- }
- return name;
- }
- };
-
- protected final boolean isSecurityEnabled;
-
- protected OozieWorkflowBuilder(T entity) {
- super(entity);
- isSecurityEnabled = SecurityUtil.isSecurityEnabled();
- }
-
- protected Path getCoordPath(Path bundlePath, String coordName) {
- Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
- return new Path(bundlePath, tag.name());
- }
-
- protected abstract Map<String, String> getEntityProperties();
-
- public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
- BUNDLEAPP bundleApp = new BUNDLEAPP();
- bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
- // all the properties are set prior to bundle and coordinators creation
-
- List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
- if (coordinators.size() == 0) {
- return false;
- }
-
- for (COORDINATORAPP coordinatorapp : coordinators) {
- Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
- String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
- EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
-
- // copy falcon libs to the workflow dir
- copySharedLibs(cluster, coordinatorapp);
-
- // add the coordinator to the bundle
- COORDINATOR bundleCoord = new COORDINATOR();
- bundleCoord.setName(coordinatorapp.getName());
- bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
- bundleApp.getCoordinator().add(bundleCoord);
- }
-
- // create logs dir once since its at the root of the bundle path
- createLogsDir(cluster);
-
- marshal(cluster, bundleApp, bundlePath); // write the bundle
- return true;
- }
-
- private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
- FileStatus[] libs = null;
- try {
- libs = fs.listStatus(path);
- } catch(FileNotFoundException ignore) {
- //Ok if the libext is not configured
- }
-
- if (libs == null) {
- return;
- }
-
- for(FileStatus lib : libs) {
- if (lib.isDir()) {
- continue;
- }
-
- for(Object obj: wf.getDecisionOrForkOrJoin()) {
- if (!(obj instanceof ACTION)) {
- continue;
- }
- ACTION action = (ACTION) obj;
- List<String> files = null;
- if (action.getJava() != null) {
- files = action.getJava().getFile();
- } else if (action.getPig() != null) {
- files = action.getPig().getFile();
- } else if (action.getMapReduce() != null) {
- files = action.getMapReduce().getFile();
- }
- if (files != null) {
- files.add(lib.getPath().toString());
- }
- }
- }
- }
-
- protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
- throws IOException, FalconException {
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
- FileSystem fs = HadoopClientFactory.get().createFileSystem(
- ClusterHelper.getConfiguration(cluster));
- addExtensionJars(fs, new Path(libext), wf);
- addExtensionJars(fs, new Path(libext, type.name()), wf);
- if (StringUtils.isNotEmpty(lifecycle)) {
- addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
- }
- }
-
- private void copySharedLibs(Cluster cluster, COORDINATORAPP coordinatorapp) throws FalconException {
- try {
- String coordPath = coordinatorapp.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- Path libPath = new Path(coordPath, "lib");
- SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
- libPath, cluster, FALCON_JAR_FILTER);
- } catch (IOException e) {
- throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
- }
- }
-
- public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
-
- protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
- org.apache.falcon.oozie.coordinator.CONFIGURATION conf
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
- List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
- for (Entry<String, String> prop : propMap.entrySet()) {
- props.add(createCoordProperty(prop.getKey(), prop.getValue()));
- }
- return conf;
- }
-
- protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, String coordName) {
- Map<String, String> props = new HashMap<String, String>();
- props.put(ARG.entityName.getPropName(), entity.getName());
- props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
- props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
- props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
-
- addBrokerProperties(cluster, props);
-
- props.put(OozieClient.EXTERNAL_ID,
- new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
- "${coord:nominalTime()}").getId());
- props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
-
- addLateDataProperties(props);
-
- addClusterProperties(cluster, props);
-
- props.put(MR_QUEUE_NAME, "default");
- props.put(MR_JOB_PRIORITY, "NORMAL");
-
- //props in entity override the set props.
- props.putAll(getEntityProperties());
-
- // this cannot be overridden
- props.put("logDir", getStoragePath(EntityUtil.getLogPath(cluster, entity)));
-
- return props;
- }
-
- private void addBrokerProperties(Cluster cluster, Map<String, String> props) {
- props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
- props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-
- String falconBrokerUrl = StartupProperties.get().getProperty(
- ARG.brokerUrl.getPropName(), "tcp://localhost:61616?daemon=true");
- props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-
- String falconBrokerImplClass = StartupProperties.get().getProperty(
- ARG.brokerImplClass.getPropName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
- props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
-
- String jmsMessageTTL = StartupProperties.get().getProperty(
- "broker.ttlInMins", DEFAULT_BROKER_MSG_TTL.toString());
- props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
- }
-
- private void addLateDataProperties(Map<String, String> props) {
- try {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
- props.put("shouldRecord", "false");
- } else {
- props.put("shouldRecord", "true");
- }
- } catch (FalconException e) {
- LOG.error("Unable to get Late Process for entity: {}", entity, e);
- throw new FalconRuntimException(e);
- }
- }
-
- private void addClusterProperties(Cluster cluster, Map<String, String> props) {
- props.put(ARG.cluster.getPropName(), cluster.getName());
-
- if (cluster.getProperties() != null) {
- for (Property prop : cluster.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
- }
-
- protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
- String value) {
- org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
- prop.setName(name);
- prop.setValue(value);
- return prop;
- }
-
- protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
- throws FalconException {
- try {
- Marshaller marshaller = jaxbContext.createMarshaller();
- marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
- FileSystem fs = HadoopClientFactory.get().createFileSystem(
- outPath.toUri(), ClusterHelper.getConfiguration(cluster));
- OutputStream out = fs.create(outPath);
- try {
- marshaller.marshal(jaxbElement, out);
- } finally {
- out.close();
- }
- if (LOG.isDebugEnabled()) {
- StringWriter writer = new StringWriter();
- marshaller.marshal(jaxbElement, writer);
- LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
- LOG.debug(writer.getBuffer().toString());
- }
-
- LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
- } catch (Exception e) {
- throw new FalconException("Unable to marshall app object", e);
- }
- }
-
- private void createLogsDir(Cluster cluster) throws FalconException {
- Path logsDir = EntityUtil.getLogPath(cluster, entity);
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(
- ClusterHelper.getConfiguration(cluster));
- if (fs.exists(logsDir)) {
- return;
- }
-
- fs.mkdirs(logsDir);
-
- // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
- fs.setPermission(logsDir, permission);
- } catch (Exception e) {
- throw new FalconException("Unable to create logs dir at: " + logsDir, e);
- }
- }
-
- protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath,
- String name) throws FalconException {
- name = (StringUtils.isEmpty(name) ? "coordinator" : name) + ".xml";
- marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
- OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, name));
- return name;
- }
-
- protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
- marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
- OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
- }
-
- protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
- marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
- OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
- }
-
- protected String getStoragePath(Path path) {
- if (path != null) {
- return getStoragePath(path.toString());
- }
- return null;
- }
-
- protected String getStoragePath(String path) {
- if (StringUtils.isNotEmpty(path)) {
- if (new Path(path).toUri().getScheme() == null) {
- path = "${nameNode}" + path;
- }
- }
- return path;
- }
-
- protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked")
- JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
- resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
- }
- }
-
- protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked")
- JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
- unmarshaller.unmarshal(resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
- }
- }
-
- // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
- protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
- String prefix) throws FalconException {
- Configuration hiveConf = getHiveCredentialsAsConf(cluster);
-
- try {
- Configuration conf = ClusterHelper.getConfiguration(cluster);
- FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-
- // create hive conf to stagingDir
- Path confPath = new Path(workflowPath + "/conf");
-
- persistHiveConfiguration(fs, confPath, hiveConf, prefix);
- } catch (IOException e) {
- throw new FalconException("Unable to create create hive site", e);
- }
- }
-
- protected void persistHiveConfiguration(FileSystem fs, Path confPath,
- Cluster cluster, String prefix) throws IOException {
- Configuration hiveConf = getHiveCredentialsAsConf(cluster);
- persistHiveConfiguration(fs, confPath, hiveConf, prefix);
- }
-
- private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
- String prefix) throws IOException {
- OutputStream out = null;
- try {
- out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
- hiveConf.writeXml(out);
- } finally {
- IOUtils.closeQuietly(out);
- }
- }
-
- private Configuration getHiveCredentialsAsConf(Cluster cluster) {
- Map<String, String> hiveCredentials = getHiveCredentials(cluster);
-
- Configuration hiveConf = new Configuration(false);
- for (Entry<String, String> entry : hiveCredentials.entrySet()) {
- hiveConf.set(entry.getKey(), entry.getValue());
- }
-
- return hiveConf;
- }
-
- private Map<String, String> getHiveCredentials(Cluster cluster) {
- Map<String, String> hiveCredentials = new HashMap<String, String>();
-
- String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
- if (metaStoreUrl == null) {
- throw new IllegalStateException(
- "Registry interface is not defined in cluster: " + cluster.getName());
- }
-
- // Propagate the hive properties from cluster entity
- Map<String, String> hiveProperties = ClusterHelper.geHiveProperties(cluster);
- if (hiveProperties != null && !hiveProperties.isEmpty()) {
- hiveCredentials.putAll(hiveProperties);
- }
-
- hiveCredentials.put(METASTOREURIS, metaStoreUrl);
- hiveCredentials.put("hive.metastore.execute.setugi", "true");
- hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
- hiveCredentials.put("hcat.metastore.uri", metaStoreUrl);
-
- if (isSecurityEnabled) {
- String principal = ClusterHelper
- .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
- hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
- hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
- hiveCredentials.put("hcat.metastore.principal", principal);
- }
-
- return hiveCredentials;
- }
-
- /**
- * This is only necessary if table is involved and is secure mode.
- *
- * @param cluster cluster entity
- * @param credentialName credential name
- * @return CREDENTIALS object
- */
- protected CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
- final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
-
- CREDENTIAL credential = new CREDENTIAL();
- credential.setName(credentialName);
- credential.setType("hcat");
-
- credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
- credential.getProperty().add(createProperty("hcat.metastore.principal",
- ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
-
- return credential;
- }
-
- private CREDENTIAL.Property createProperty(String name, String value) {
- CREDENTIAL.Property property = new CREDENTIAL.Property();
- property.setName(name);
- property.setValue(value);
- return property;
- }
-
- /**
- * This is only necessary if table is involved and is secure mode.
- *
- * @param workflowApp workflow xml
- * @param cluster cluster entity
- */
- protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
- String credentialName) {
- CREDENTIALS credentials = workflowApp.getCredentials();
- if (credentials == null) {
- credentials = new CREDENTIALS();
- }
-
- credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
-
- // add credential for workflow
- workflowApp.setCredentials(credentials);
- }
-
- /**
- * This is only necessary if table is involved and is secure mode.
- *
- * @param workflowApp workflow xml
- * @param cluster cluster entity
- */
- protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
- String credentialName, Set<String> actions) {
- addHCatalogCredentials(workflowApp, cluster, credentialName);
-
- // add credential to each action
- for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
- if (!(object instanceof ACTION)) {
- continue;
- }
-
- ACTION action = (ACTION) object;
- String actionName = action.getName();
- if (actions.contains(actionName)) {
- action.setCred(credentialName);
- }
- }
- }
-
- protected abstract boolean shouldSetupHiveConfiguration(Cluster cluster,
- T entity) throws FalconException;
-
- protected void decorateWithOozieRetries(ACTION action) {
- Properties props = RuntimeProperties.get();
- action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
- action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
- }
-
- protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
- Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
- Properties properties = new Properties();
- if (cluster.getProperties() != null) {
- addClusterProperties(properties, cluster.getProperties().getProperties());
- }
- properties.setProperty(OozieWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
- properties.setProperty(OozieWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
- properties.setProperty(OozieClient.BUNDLE_APP_PATH,
- "${" + OozieWorkflowEngine.NAME_NODE + "}" + bundlePath.toString());
- properties.setProperty("colo.name", cluster.getColo());
-
- properties.setProperty(OozieClient.USER_NAME, user);
- properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
- properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
-
- if (shouldSetupHiveConfiguration(cluster, entity)) {
- propagateHiveCredentials(cluster, properties);
- }
-
- LOG.info("Cluster: {}, PROPS: {}", cluster.getName(), properties);
- return properties;
- }
-
- private void addClusterProperties(Properties properties, List<Property> clusterProperties) {
- for (Property prop : clusterProperties) {
- properties.setProperty(prop.getName(), prop.getValue());
- }
- }
-
- /**
- * This method propagates hive credentials for coordinator to authenticate against hive
- * for data availability triggers.
- *
- * @param cluster cluster entity
- * @param properties property object
- */
- private void propagateHiveCredentials(Cluster cluster, Properties properties) {
- Map<String, String> hiveCredentials = getHiveCredentials(cluster);
- for (Entry<String, String> entry : hiveCredentials.entrySet()) {
- properties.setProperty(entry.getKey(), entry.getValue());
- }
- }
-
- public abstract Date getNextStartTime(T entity, String cluster, Date now) throws FalconException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index c65bed9..38be792 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesResult.Instance;
@@ -42,8 +43,6 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.workflow.OozieWorkflowBuilder;
-import org.apache.falcon.workflow.WorkflowBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -142,22 +141,20 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
if (!schedClusters.isEmpty()) {
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
- Map<String, Properties> newFlows =
- builder.newWorkflowSchedule(schedClusters.toArray(new String[schedClusters.size()]));
- for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
- String cluster = entry.getKey();
- LOG.info("Scheduling {} on cluster {}", entity.toShortString(), cluster);
- scheduleEntity(cluster, entry.getValue(), entity);
- commitStagingPath(cluster, entry.getValue().getProperty(OozieClient.BUNDLE_APP_PATH));
+ OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
+ for (String clusterName: schedClusters) {
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+ LOG.info("Scheduling {} on cluster {}", entity.toShortString(), clusterName);
+ Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+ Properties properties = builder.build(cluster, buildPath);
+ scheduleEntity(clusterName, properties, entity);
+ commitStagingPath(cluster, buildPath);
}
}
}
- private void commitStagingPath(String cluster, String path) throws FalconException {
- path = StringUtils.removeStart(path, "${nameNode}");
- Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
- FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(clusterEntity));
+ private void commitStagingPath(Cluster cluster, Path path) throws FalconException {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
try {
fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close();
} catch (IOException e) {
@@ -405,21 +402,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException {
try {
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
List<Instance> runInstances = new ArrayList<Instance>();
- String[] wfNames = builder.getWorkflowNames();
- List<String> coordNames = new ArrayList<String>();
- for (String wfName : wfNames) {
- if (!isCoordApplicable(wfName, lifeCycles)) {
- continue;
- }
- coordNames.add(wfName);
- }
for (String cluster : clusters) {
ProxyOozieClient client = OozieClientFactory.get(cluster);
- List<WorkflowJob> wfs = getRunningWorkflows(cluster, coordNames);
+ List<String> wfNames = EntityUtil.getWorkflowNames(entity, cluster);
+ List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames);
if (wfs != null) {
for (WorkflowJob job : wfs) {
WorkflowJob wf = client.getJobInfo(job.getId());
@@ -958,8 +947,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString());
Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
if (newEndTime.before(now())) {
- throw new FalconException("New end time for " + newEntity.getName() + " is past current time. Entity "
- + "can't be updated. Use remove and add");
+ throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime)
+ + " is before current time. Entity can't be updated. Use remove and add");
}
LOG.debug("Updating for cluster: {}, bundle: {}", cluster, bundle.getId());
@@ -974,8 +963,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
newEntity.toShortString(), cluster, bundle.getId());
- effectiveTime = updateInternal(oldEntity, newEntity, cluster, bundle,
- false, effectiveTime, CurrentUser.getUser());
+ effectiveTime = updateInternal(oldEntity, newEntity, clusterEntity, bundle, false, effectiveTime,
+ CurrentUser.getUser());
LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
newEntity.toShortString(), cluster, bundle.getId());
}
@@ -999,8 +988,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
- Date depEndTime = updateInternal(affectedEntity, affectedEntity, cluster,
- affectedProcBundle, false, effectiveTime, affectedProcBundle.getUser());
+ Date depEndTime = updateInternal(affectedEntity, affectedEntity, clusterEntity, affectedProcBundle,
+ false, effectiveTime, affectedProcBundle.getUser());
if (effectiveTime == null || effectiveTime.after(depEndTime)) {
effectiveTime = depEndTime;
}
@@ -1091,20 +1080,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private Date updateInternal(Entity oldEntity, Entity newEntity, String cluster,
- BundleJob oldBundle, boolean alreadyCreated, Date inEffectiveTime,
- String user) throws FalconException {
- OozieWorkflowBuilder<Entity> builder =
- (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
-
+ private Date updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
+ boolean alreadyCreated, Date inEffectiveTime, String user) throws FalconException {
Job.Status oldBundleStatus = oldBundle.getStatus();
+ String clusterName = cluster.getName();
+
//Suspend coords as bundle suspend doesn't suspend coords synchronously
- suspendCoords(cluster, oldBundle);
+ suspendCoords(clusterName, oldBundle);
- Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
- Path stagingPath = EntityUtil.getLatestStagingPath(clusterEntity, oldEntity);
+ Path stagingPath = EntityUtil.getLatestStagingPath(cluster, oldEntity);
//find last scheduled bundle
- BundleJob latestBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
+ BundleJob latestBundle = findBundleForStagingPath(clusterName, oldEntity, stagingPath);
Date effectiveTime;
if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath())
|| latestBundle == null || !alreadyCreated) {
@@ -1121,13 +1107,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
//pick start time for new bundle which is after effectiveTime
- effectiveTime = builder.getNextStartTime(newEntity, cluster, effectiveTime);
+ effectiveTime = EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
//schedule new bundle
String newBundleId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
//newBundleId and latestBundle will be null if effectiveTime = process end time
if (newBundleId != null) {
- latestBundle = getBundleInfo(cluster, newBundleId);
+ latestBundle = getBundleInfo(clusterName, newBundleId);
LOG.info("New bundle {} scheduled successfully with start time {}",
newBundleId, SchemaHelper.formatDateUTC(effectiveTime));
}
@@ -1144,37 +1130,37 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
if (effectiveTime != null) {
//set endtime for old coords
- updateCoords(cluster, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
+ updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
}
if (oldBundleStatus != Job.Status.SUSPENDED
&& oldBundleStatus != Job.Status.PREPSUSPENDED) {
//resume coords
- resumeCoords(cluster, oldBundle);
+ resumeCoords(clusterName, oldBundle);
}
//latestBundle will be null if effectiveTime = process end time
if (latestBundle != null) {
//create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
- commitStagingPath(cluster, latestBundle.getAppPath());
+ commitStagingPath(cluster, new Path(latestBundle.getAppPath()));
}
return effectiveTime;
}
- private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
+ private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate, String user)
throws FalconException {
Entity clone = entity.copy();
String currentUser = CurrentUser.getUser();
switchUser(user);
try {
- EntityUtil.setStartDate(clone, cluster, startDate);
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
- Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);
- LOG.info("Scheduling {} on cluster {} with props {}",
- entity.toShortString(), cluster, bundleProps);
- if (bundleProps != null && bundleProps.size() > 0) {
- return scheduleEntity(cluster, bundleProps.get(cluster), entity);
+ EntityUtil.setStartDate(clone, cluster.getName(), startDate);
+ Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
+ OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
+ Properties properties = builder.build(cluster, buildPath);
+ if (properties != null) {
+ LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster, properties);
+ return scheduleEntity(cluster.getName(), properties, entity);
} else {
LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
return null;
@@ -1210,7 +1196,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws FalconException {
+ private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws
+ FalconException {
StringBuilder filter = new StringBuilder();
filter.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING.name());
for (String wfName : wfNames) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/coordinator/replication-coordinator.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/coordinator/replication-coordinator.xml b/oozie/src/main/resources/coordinator/replication-coordinator.xml
new file mode 100644
index 0000000..693b0bd
--- /dev/null
+++ b/oozie/src/main/resources/coordinator/replication-coordinator.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<coordinator-app name="#NAME#" frequency="#FREQUENCY#"
+ start="#START_TIME#" end="#END_TIME" timezone="#TIMEZONE#"
+ xmlns="uri:oozie:coordinator:0.3">
+ <controls>
+ <concurrency>1</concurrency>
+ <execution>FIFO</execution>
+ </controls>
+ <datasets>
+ <dataset name="input-dataset" frequency="#FEED_FREQ#"
+ initial-instance="#START_TIME#" timezone="#TIMEZONE#">
+ <uri-template>#FEED_PATH#</uri-template>
+ </dataset>
+ <dataset name="output-dataset" frequency="#FEED_FREQ#"
+ initial-instance="#START_TIME#" timezone="#TIMEZONE#">
+ <uri-template>#FEED_PATH#</uri-template>
+ </dataset>
+ </datasets>
+ <input-events>
+ <data-in name="input" dataset="input-dataset">
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ </input-events>
+ <output-events>
+ <data-out name="output" dataset="output-dataset">
+ <instance>${coord:current(0)}</instance>
+ </data-out>
+ </output-events>
+ <action>
+ <workflow>
+ <app-path>#WF_PATH#</app-path>
+ <configuration/>
+ </workflow>
+ </action>
+</coordinator-app>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-export.hql b/oozie/src/main/resources/workflow/falcon-table-export.hql
new file mode 100644
index 0000000..37fd1b7
--- /dev/null
+++ b/oozie/src/main/resources/workflow/falcon-table-export.hql
@@ -0,0 +1,18 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-import.hql b/oozie/src/main/resources/workflow/falcon-table-import.hql
new file mode 100644
index 0000000..653d580
--- /dev/null
+++ b/oozie/src/main/resources/workflow/falcon-table-import.hql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/process-parent-workflow.xml b/oozie/src/main/resources/workflow/process-parent-workflow.xml
new file mode 100644
index 0000000..4a2495c
--- /dev/null
+++ b/oozie/src/main/resources/workflow/process-parent-workflow.xml
@@ -0,0 +1,278 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-process-parent-workflow'>
+ <start to='should-record'/>
+ <decision name='should-record'>
+ <switch>
+ <case to="recordsize">
+ ${shouldRecord=="true"}
+ </case>
+ <default to="user-workflow"/>
+ </switch>
+ </decision>
+ <action name='recordsize'>
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <!-- HCatalog jars -->
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+ <arg>-out</arg>
+ <arg>${logDir}/latedata/${nominalTime}</arg>
+ <arg>-paths</arg>
+ <arg>${falconInPaths}</arg>
+ <arg>-falconInputFeeds</arg>
+ <arg>${falconInputFeeds}</arg>
+ <arg>-falconInputFeedStorageTypes</arg>
+ <arg>${falconInputFeedStorageTypes}</arg>
+ <capture-output/>
+ </java>
+ <ok to="user-workflow"/>
+ <error to="failed-post-processing"/>
+ </action>
+ <decision name='user-workflow'>
+ <switch>
+ <case to="user-oozie-workflow">
+ ${userWorkflowEngine=="oozie"}
+ </case>
+ <case to="user-pig-job">
+ ${userWorkflowEngine=="pig"}
+ </case>
+ <case to="user-hive-job">
+ ${userWorkflowEngine=="hive"}
+ </case>
+ <default to="user-oozie-workflow"/>
+ </switch>
+ </decision>
+ <action name='user-pig-job'>
+ <pig>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.pig</name>
+ <value>pig,hcatalog</value>
+ </property>
+ </configuration>
+ <script>#USER_WF_PATH#</script>
+ </pig>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+ </action>
+ <action name="user-hive-job">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>#USER_WF_PATH#</script>
+ </hive>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+ </action>
+ <action name='user-oozie-workflow'>
+ <sub-workflow>
+ <app-path>#USER_WF_PATH#</app-path>
+ <propagate-configuration/>
+ </sub-workflow>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+ </action>
+ <action name='succeeded-post-processing'>
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+ <arg>-cluster</arg>
+ <arg>${cluster}</arg>
+ <arg>-entityType</arg>
+ <arg>${entityType}</arg>
+ <arg>-entityName</arg>
+ <arg>${entityName}</arg>
+ <arg>-nominalTime</arg>
+ <arg>${nominalTime}</arg>
+ <arg>-operation</arg>
+ <arg>GENERATE</arg>
+ <arg>-workflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-runId</arg>
+ <arg>${wf:run()}</arg>
+ <arg>-status</arg>
+ <arg>SUCCEEDED</arg>
+ <arg>-timeStamp</arg>
+ <arg>${timeStamp}</arg>
+ <arg>-brokerImplClass</arg>
+ <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>-brokerUrl</arg>
+ <arg>${wf:conf("broker.url")}</arg>
+ <arg>-userBrokerImplClass</arg>
+ <arg>${userBrokerImplClass}</arg>
+ <arg>-userBrokerUrl</arg>
+ <arg>${userBrokerUrl}</arg>
+ <arg>-brokerTTL</arg>
+ <arg>${wf:conf("broker.ttlInMins")}</arg>
+ <arg>-feedNames</arg>
+ <arg>${feedNames}</arg>
+ <arg>-feedInstancePaths</arg>
+ <arg>${feedInstancePaths}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ <arg>-workflowEngineUrl</arg>
+ <arg>${workflowEngineUrl}</arg>
+ <arg>-subflowId</arg>
+ <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+ <arg>-userWorkflowEngine</arg>
+ <arg>${userWorkflowEngine}</arg>
+ <arg>-userWorkflowName</arg>
+ <arg>${userWorkflowName}</arg>
+ <arg>-userWorkflowVersion</arg>
+ <arg>${userWorkflowVersion}</arg>
+ <arg>-logDir</arg>
+ <arg>${logDir}/job-${nominalTime}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
+ <arg>-falconInputFeeds</arg>
+ <arg>${falconInputFeeds}</arg>
+ <arg>-falconInputPaths</arg>
+ <arg>${falconInPaths}</arg>
+ <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+ <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+ <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+ <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+ <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+ <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+ </java>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <action name='failed-post-processing'>
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+ <arg>-cluster</arg>
+ <arg>${cluster}</arg>
+ <arg>-entityType</arg>
+ <arg>${entityType}</arg>
+ <arg>-entityName</arg>
+ <arg>${entityName}</arg>
+ <arg>-nominalTime</arg>
+ <arg>${nominalTime}</arg>
+ <arg>-operation</arg>
+ <arg>GENERATE</arg>
+ <arg>-workflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-runId</arg>
+ <arg>${wf:run()}</arg>
+ <arg>-status</arg>
+ <arg>FAILED</arg>
+ <arg>-timeStamp</arg>
+ <arg>${timeStamp}</arg>
+ <arg>-brokerImplClass</arg>
+ <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>-brokerUrl</arg>
+ <arg>${wf:conf("broker.url")}</arg>
+ <arg>-userBrokerImplClass</arg>
+ <arg>${userBrokerImplClass}</arg>
+ <arg>-userBrokerUrl</arg>
+ <arg>${userBrokerUrl}</arg>
+ <arg>-brokerTTL</arg>
+ <arg>${wf:conf("broker.ttlInMins")}</arg>
+ <arg>-feedNames</arg>
+ <arg>${feedNames}</arg>
+ <arg>-feedInstancePaths</arg>
+ <arg>${feedInstancePaths}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ <arg>-workflowEngineUrl</arg>
+ <arg>${workflowEngineUrl}</arg>
+ <arg>-subflowId</arg>
+ <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+ <arg>-userWorkflowEngine</arg>
+ <arg>${userWorkflowEngine}</arg>
+ <arg>-logDir</arg>
+ <arg>${logDir}/job-${nominalTime}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
+ <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+ <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+ <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+ <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+ <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+ <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+ </java>
+ <ok to="fail"/>
+ <error to="fail"/>
+ </action>
+ <kill name="fail">
+ <message>
+ Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ </message>
+ </kill>
+ <end name='end'/>
+</workflow-app>