You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:08 UTC
[3/5] FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index e5a01ca..990fdc5 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -18,22 +18,64 @@
package org.apache.falcon.workflow;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.ObjectFactory;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.oozie.client.OozieClient;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.Arrays;
import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
/**
* Base workflow builder for falcon entities.
@@ -44,6 +86,334 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
private static final Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+ protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+ protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+
+ protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+ protected static final String MR_QUEUE_NAME = "queueName";
+ protected static final String MR_JOB_PRIORITY = "jobPriority";
+
+ public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+ Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+
+ protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("falcon");
+ }
+
+ @Override
+ public String getJarName(Path path) {
+ String name = path.getName();
+ if (name.endsWith(".jar")) {
+ name = name.substring(0, name.indexOf(".jar"));
+ }
+ return name;
+ }
+ };
+
+ protected OozieWorkflowBuilder(T entity) {
+ super(entity);
+ }
+
+ protected Path getCoordPath(Path bundlePath, String coordName) {
+ Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
+ return new Path(bundlePath, tag.name());
+ }
+
+ protected abstract Map<String, String> getEntityProperties();
+
+ public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
+ BUNDLEAPP bundleApp = new BUNDLEAPP();
+ bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
+ // all the properties are set prior to bundle and coordinators creation
+
+ List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
+ if (coordinators.size() == 0) {
+ return false;
+ }
+ for (COORDINATORAPP coordinatorapp : coordinators) {
+ Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
+ String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
+ EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+ createLogsDir(cluster, coordPath);
+ COORDINATOR bundleCoord = new COORDINATOR();
+ bundleCoord.setName(coordinatorapp.getName());
+ bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+ bundleApp.getCoordinator().add(bundleCoord);
+
+ copySharedLibs(cluster, coordPath);
+ }
+
+ marshal(cluster, bundleApp, bundlePath);
+ return true;
+ }
+
+ private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+ FileStatus[] libs = null;
+ try {
+ libs = fs.listStatus(path);
+ } catch(FileNotFoundException ignore) {
+ //Ok if the libext is not configured
+ }
+
+ if (libs == null) {
+ return;
+ }
+
+ for(FileStatus lib : libs) {
+ if (lib.isDir()) {
+ continue;
+ }
+
+ for(Object obj: wf.getDecisionOrForkOrJoin()) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ List<String> files = null;
+ if (action.getJava() != null) {
+ files = action.getJava().getFile();
+ } else if (action.getPig() != null) {
+ files = action.getPig().getFile();
+ } else if (action.getMapReduce() != null) {
+ files = action.getMapReduce().getFile();
+ }
+ if (files != null) {
+ files.add(lib.getPath().toString());
+ }
+ }
+ }
+ }
+
+ protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
+ throws IOException, FalconException {
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+ addExtensionJars(fs, new Path(libext), wf);
+ addExtensionJars(fs, new Path(libext, type.name()), wf);
+ if (StringUtils.isNotEmpty(lifecycle)) {
+ addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
+ }
+ }
+
+ private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
+ try {
+ Path libPath = new Path(coordPath, "lib");
+ SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+ libPath, cluster, FALCON_JAR_FILTER);
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+ }
+ }
+
+ public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
+
+ protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
+ org.apache.falcon.oozie.coordinator.CONFIGURATION conf
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+ List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
+ for (Entry<String, String> prop : propMap.entrySet()) {
+ props.add(createCoordProperty(prop.getKey(), prop.getValue()));
+ }
+ return conf;
+ }
+
+ protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(ARG.entityName.getPropName(), entity.getName());
+ props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
+ props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+ props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
+ props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
+ String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
+ "tcp://localhost:61616?daemon=true");
+ props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
+ String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
+ ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+ String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+ DEFAULT_BROKER_MSG_TTL.toString());
+ props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
+ props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+ props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
+ props.put(OozieClient.EXTERNAL_ID,
+ new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+ "${coord:nominalTime()}").getId());
+ props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+ try {
+ if (EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+ props.put("shouldRecord", "false");
+ } else {
+ props.put("shouldRecord", "true");
+ }
+ } catch (FalconException e) {
+ LOG.error("Unable to get Late Process for entity:" + entity, e);
+ throw new FalconRuntimException(e);
+ }
+ props.put("entityName", entity.getName());
+ props.put("entityType", entity.getEntityType().name().toLowerCase());
+ props.put(ARG.cluster.getPropName(), cluster.getName());
+ if (cluster.getProperties() != null) {
+ for (Property prop : cluster.getProperties().getProperties()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ }
+
+ props.put(MR_QUEUE_NAME, "default");
+ props.put(MR_JOB_PRIORITY, "NORMAL");
+ //props in entity override the set props.
+ props.putAll(getEntityProperties());
+ return props;
+ }
+
+ protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
+ String value) {
+ org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+ prop.setName(name);
+ prop.setValue(value);
+ return prop;
+ }
+
+ protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+ throws FalconException {
+ try {
+ Marshaller marshaller = jaxbContext.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ OutputStream out = fs.create(outPath);
+ try {
+ marshaller.marshal(jaxbElement, out);
+ } finally {
+ out.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ StringWriter writer = new StringWriter();
+ marshaller.marshal(jaxbElement, writer);
+ LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
+ LOG.debug(writer.getBuffer());
+ }
+
+ LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
+ } catch (Exception e) {
+ throw new FalconException("Unable to marshall app object", e);
+ }
+ }
+
+ private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ Path logsDir = new Path(coordPath, "../../logs");
+ fs.mkdirs(logsDir);
+
+ // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ fs.setPermission(logsDir, permission);
+ } catch (Exception e) {
+ throw new FalconException("Unable to create temp dir in " + coordPath, e);
+ }
+ }
+
+ protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
+ if (StringUtils.isEmpty(name)) {
+ name = "coordinator";
+ }
+ name = name + ".xml";
+ marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), OozieUtils.COORD_JAXB_CONTEXT,
+ new Path(outPath, name));
+ return name;
+ }
+
+ protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
+ marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+ OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+ }
+
+ protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+ marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+ OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+ }
+
+ protected String getStoragePath(Path path) {
+ if (path != null) {
+ return getStoragePath(path.toString());
+ }
+ return null;
+ }
+
+ protected String getStoragePath(String path) {
+ if (StringUtils.isNotEmpty(path)) {
+ if (new Path(path).toUri().getScheme() == null) {
+ path = "${nameNode}" + path;
+ }
+ }
+ return path;
+ }
+
+ protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ resourceAsStream);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
+
+ protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
+ unmarshaller.unmarshal(resourceAsStream);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
+
+ protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
+ Cluster cluster, String prefix) throws IOException {
+ Configuration hiveConf = new Configuration(false);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
+ hiveConf.set("hive.metastore.local", "false");
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+ ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+ hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ }
+
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+ hiveConf.writeXml(out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ protected void decorateWithOozieRetries(ACTION action) {
+ Properties props = RuntimeProperties.get();
+ action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+ action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+ }
+
protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
Properties properties = new Properties();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ac8862e..d819e93 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -116,7 +116,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
if (!schedClusters.isEmpty()) {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
- Map<String, Properties> newFlows = builder.newWorkflowSchedule(entity, schedClusters);
+ Map<String, Properties> newFlows = builder.newWorkflowSchedule(schedClusters.toArray(new
+ String[schedClusters.size()]));
for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
String cluster = entry.getKey();
LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster);
@@ -380,7 +381,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
List<Instance> runInstances = new ArrayList<Instance>();
- String[] wfNames = builder.getWorkflowNames(entity);
+ String[] wfNames = builder.getWorkflowNames();
List<String> coordNames = new ArrayList<String>();
for (String wfName : wfNames) {
if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
@@ -1059,11 +1060,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
throws FalconException {
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
- Properties bundleProps = builder.newWorkflowSchedule(entity, startDate, cluster, user);
+ Entity clone = entity.copy();
+ EntityUtil.setStartDate(entity, cluster, startDate);
+ WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
+ Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);
LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster + " with props " + bundleProps);
- if (bundleProps != null) {
- return scheduleEntity(cluster, bundleProps, entity);
+ if (bundleProps != null && bundleProps.size() > 0) {
+ return scheduleEntity(cluster, bundleProps.get(cluster), entity);
} else {
LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
deleted file mode 100644
index e638961..0000000
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ /dev/null
@@ -1,833 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.CONTROLS;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.DATAIN;
-import org.apache.falcon.oozie.coordinator.DATAOUT;
-import org.apache.falcon.oozie.coordinator.DATASETS;
-import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DELETE;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.PREPARE;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.hadoop.fs.*;
-import org.apache.xerces.dom.ElementNSImpl;
-import org.w3c.dom.Document;
-
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.dom.DOMResult;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class maps the Falcon entities into Oozie artifacts.
- */
-public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
- private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
- private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
- public OozieProcessMapper(Process entity) {
- super(entity);
- }
-
- @Override
- protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
- Process process = getEntity();
-
- //Copy user workflow and lib to staging dir
- Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getPath()),
- new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
- if (process.getWorkflow().getLib() != null && fs.exists(new Path(process.getWorkflow().getLib()))) {
- checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getLib()),
- new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
- }
-
- writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
- } catch (IOException e) {
- throw new FalconException("Failed to copy user workflow/lib", e);
- }
-
- List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
- apps.add(createDefaultCoordinator(cluster, bundlePath));
-
- return apps;
- }
-
- private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
- try {
- FSDataOutputStream stream = fs.create(path);
- try {
- for (Map.Entry<String, String> entry : checksums.entrySet()) {
- stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
- }
- } finally {
- stream.close();
- }
- } catch (IOException e) {
- throw new FalconException("Failed to copy user workflow/lib", e);
- }
- }
-
- private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
- Process process = getEntity();
- Path wfPath = new Path(process.getWorkflow().getPath());
- if (fs.isFile(wfPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
- } else {
- return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
- }
- } catch(IOException e) {
- throw new FalconException("Failed to get workflow path", e);
- }
- }
-
- private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- Process process = getEntity();
- if (process.getWorkflow().getLib() == null) {
- return null;
- }
- Path libPath = new Path(process.getWorkflow().getLib());
-
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
- if (fs.isFile(libPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
- } else {
- return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
- }
- } catch(IOException e) {
- throw new FalconException("Failed to get user lib path", e);
- }
- }
-
- /**
- * Creates default oozie coordinator.
- *
- * @param cluster - Cluster for which the coordiantor app need to be created
- * @param bundlePath - bundle path
- * @return COORDINATORAPP
- * @throws FalconException on Error
- */
- public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
- Process process = getEntity();
- if (process == null) {
- return null;
- }
-
- COORDINATORAPP coord = new COORDINATORAPP();
- String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString();
- Path coordPath = getCoordPath(bundlePath, coordName);
-
- // coord attributes
- initializeCoordAttributes(cluster, process, coord, coordName);
-
- CONTROLS controls = initializeControls(process); // controls
- coord.setControls(controls);
-
- // Configuration
- Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
-
- initializeInputPaths(cluster, process, coord, props); // inputs
- initializeOutputPaths(cluster, process, coord, props); // outputs
-
- Workflow processWorkflow = process.getWorkflow();
- propagateUserWorkflowProperties(processWorkflow, props, process.getName());
-
- // create parent wf
- createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
-
- WORKFLOW wf = new WORKFLOW();
- wf.setAppPath(getStoragePath(coordPath.toString()));
- wf.setConfiguration(getCoordConfig(props));
-
- // set coord action to parent wf
- org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
- action.setWorkflow(wf);
- coord.setAction(action);
-
- return coord;
- }
-
- private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
- coord.setName(coordName);
- org.apache.falcon.entity.v0.process.Cluster processCluster =
- ProcessHelper.getCluster(process, cluster.getName());
- coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
- coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
- coord.setTimezone(process.getTimezone().getID());
- coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
- }
-
- private CONTROLS initializeControls(Process process)
- throws FalconException {
- CONTROLS controls = new CONTROLS();
- controls.setConcurrency(String.valueOf(process.getParallel()));
- controls.setExecution(process.getOrder().name());
-
- Frequency timeout = process.getTimeout();
- long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
- long timeoutInMillis;
- if (timeout != null) {
- timeoutInMillis = ExpressionHelper.get().
- evaluate(process.getTimeout().toString(), Long.class);
- } else {
- timeoutInMillis = frequencyInMillis * 6;
- if (timeoutInMillis < THIRTY_MINUTES) {
- timeoutInMillis = THIRTY_MINUTES;
- }
- }
- controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-
- if (timeoutInMillis / frequencyInMillis * 2 > 0) {
- controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
- }
-
- return controls;
- }
-
- private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props) throws FalconException {
- if (process.getInputs() == null) {
- props.put("falconInputFeeds", "NONE");
- props.put("falconInPaths", "IGNORE");
- return;
- }
-
- List<String> inputFeeds = new ArrayList<String>();
- List<String> inputPaths = new ArrayList<String>();
- List<String> inputFeedStorageTypes = new ArrayList<String>();
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- if (!input.isOptional()) {
- if (coord.getDatasets() == null) {
- coord.setDatasets(new DATASETS());
- }
- if (coord.getInputEvents() == null) {
- coord.setInputEvents(new INPUTEVENTS());
- }
-
- SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
- coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
- DATAIN datain = createDataIn(input);
- coord.getInputEvents().getDataIn().add(datain);
- }
-
- String inputExpr = null;
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
- props.put(input.getName(), inputExpr);
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- inputExpr = "${coord:dataIn('" + input.getName() + "')}";
- propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
- }
-
- inputFeeds.add(feed.getName());
- inputPaths.add(inputExpr);
- inputFeedStorageTypes.add(storage.getType().name());
- }
-
- propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
- }
-
- private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
- List<String> inputFeedStorageTypes, Map<String, String> props) {
- // populate late data handler - should-record action
- props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
- props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-
- // storage type for each corresponding feed sent as a param to LateDataHandler
- // needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
- }
-
- private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props) throws FalconException {
- if (process.getOutputs() == null) {
- props.put(ARG.feedNames.getPropName(), "NONE");
- props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
- return;
- }
-
- if (coord.getDatasets() == null) {
- coord.setDatasets(new DATASETS());
- }
-
- if (coord.getOutputEvents() == null) {
- coord.setOutputEvents(new OUTPUTEVENTS());
- }
-
- List<String> outputFeeds = new ArrayList<String>();
- List<String> outputPaths = new ArrayList<String>();
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
- coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
- DATAOUT dataout = createDataOut(output);
- coord.getOutputEvents().getDataOut().add(dataout);
-
- String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
- outputFeeds.add(feed.getName());
- outputPaths.add(outputExpr);
-
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- props.put(output.getName(), outputExpr);
-
- propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
- }
- }
-
- // Output feed name and path for parent workflow
- props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
- props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
- }
-
- private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
- String datasetName, LocationType locationType) throws FalconException {
-
- SYNCDATASET syncdataset = new SYNCDATASET();
- syncdataset.setName(datasetName);
- syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
- String uriTemplate = storage.getUriTemplate(locationType);
- if (storage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- syncdataset.setUriTemplate(uriTemplate);
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
- syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
- syncdataset.setTimezone(feed.getTimezone().getID());
-
- if (feed.getAvailabilityFlag() == null) {
- syncdataset.setDoneFlag("");
- } else {
- syncdataset.setDoneFlag(feed.getAvailabilityFlag());
- }
-
- return syncdataset;
- }
-
- private DATAOUT createDataOut(Output output) {
- DATAOUT dataout = new DATAOUT();
- dataout.setName(output.getName());
- dataout.setDataset(output.getName());
- dataout.setInstance(getELExpression(output.getInstance()));
- return dataout;
- }
-
- private DATAIN createDataIn(Input input) {
- DATAIN datain = new DATAIN();
- datain.setName(input.getName());
- datain.setDataset(input.getName());
- datain.setStartInstance(getELExpression(input.getStart()));
- datain.setEndInstance(getELExpression(input.getEnd()));
- return datain;
- }
-
- private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
- Storage storage, Map<String, String> props)
- throws FalconException {
-
- // stats and meta paths
- createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
- createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
- createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
- }
-
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
- private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
- COORDINATORAPP coord, Map<String, String> props, Storage storage)
- throws FalconException {
-
- String name = output.getName();
- String type = locType.name().toLowerCase();
-
- SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
- coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
-
- DATAOUT dataout = new DATAOUT();
- dataout.setName(name + type);
- dataout.setDataset(name + type);
- dataout.setInstance(getELExpression(output.getInstance()));
-
- OUTPUTEVENTS outputEvents = coord.getOutputEvents();
- if (outputEvents == null) {
- outputEvents = new OUTPUTEVENTS();
- coord.setOutputEvents(outputEvents);
- }
- outputEvents.getDataOut().add(dataout);
-
- String outputExpr = "${coord:dataOut('" + name + type + "')}";
- props.put(name + "." + type, outputExpr);
- }
- //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
- private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
- Map<String, String> props, String prefix) {
- props.put(prefix + "_storage_type", tableStorage.getType().name());
- props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
- props.put(prefix + "_database", tableStorage.getDatabase());
- props.put(prefix + "_table", tableStorage.getTable());
- }
-
- private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
- Map<String, String> props) {
- String prefix = "falcon_" + input.getName();
-
- propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
- props.put(prefix + "_partition_filter_pig",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
- props.put(prefix + "_partition_filter_hive",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
- props.put(prefix + "_partition_filter_java",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
- }
-
- private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
- Map<String, String> props) {
- String prefix = "falcon_" + output.getName();
-
- propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
- props.put(prefix + "_dataout_partitions",
- "${coord:dataOutPartitions('" + output.getName() + "')}");
- props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
- + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
- }
-
- private String join(Iterator<String> itr, char sep) {
- String joinedStr = StringUtils.join(itr, sep);
- if (joinedStr.isEmpty()) {
- joinedStr = "null";
- }
- return joinedStr;
- }
-
- private String getELExpression(String expr) {
- if (expr != null) {
- expr = "${" + expr + "}";
- }
- return expr;
- }
-
- @Override
- protected Map<String, String> getEntityProperties() {
- Process process = getEntity();
- Map<String, String> props = new HashMap<String, String>();
- if (process.getProperties() != null) {
- for (Property prop : process.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
- return props;
- }
-
- private void propagateUserWorkflowProperties(Workflow processWorkflow,
- Map<String, String> props, String processName) {
- props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
- processWorkflow.getName(), processName));
- props.put("userWorkflowVersion", processWorkflow.getVersion());
- props.put("userWorkflowEngine", processWorkflow.getEngine().value());
- }
-
- protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
- String wfName, Path parentWfPath) throws FalconException {
- WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
- wfApp.setName(wfName);
- try {
- addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
- } catch (IOException e) {
- throw new FalconException("Failed to add library extensions for the workflow", e);
- }
-
- String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
- EngineType engineType = processWorkflow.getEngine();
- for (Object object : wfApp.getDecisionOrForkOrJoin()) {
- if (!(object instanceof ACTION)) {
- continue;
- }
-
- ACTION action = (ACTION) object;
- String actionName = action.getName();
- if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
- action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
- } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
- decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
- } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
- decorateHiveAction(cluster, process, action, parentWfPath);
- } else if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- }
- }
-
- //Create parent workflow
- marshal(cluster, wfApp, parentWfPath);
- }
-
- private void decoratePIGAction(Cluster cluster, Process process,
- PIG pigAction, Path parentWfPath) throws FalconException {
- Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
- pigAction.setScript("${nameNode}" + userWfPath.toString());
-
- addPrepareDeleteOutputPath(process, pigAction);
-
- final List<String> paramList = pigAction.getParam();
- addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
- addOutputFeedsAsParams(paramList, process, cluster);
-
- propagateProcessProperties(pigAction, process);
-
- Storage.TYPE storageType = getStorageType(cluster, process);
- if (Storage.TYPE.TABLE == storageType) {
- // adds hive-site.xml in pig classpath
- setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
- pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
- }
-
- addArchiveForCustomJars(cluster, pigAction.getArchive(),
- getUserLibPath(cluster, parentWfPath.getParent()));
- }
-
- private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
- Path parentWfPath) throws FalconException {
-
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
- org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
- Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
- hiveAction.setScript("${nameNode}" + userWfPath.toString());
-
- addPrepareDeleteOutputPath(process, hiveAction);
-
- final List<String> paramList = hiveAction.getParam();
- addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
- addOutputFeedsAsParams(paramList, process, cluster);
-
- propagateProcessProperties(hiveAction, process);
-
- setupHiveConfiguration(cluster, parentWfPath, "falcon-");
-
- addArchiveForCustomJars(cluster, hiveAction.getArchive(),
- getUserLibPath(cluster, parentWfPath.getParent()));
-
- marshalHiveAction(wfAction, actionJaxbElement);
- }
-
- private void addPrepareDeleteOutputPath(Process process,
- PIG pigAction) throws FalconException {
- List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
- if (deleteOutputPathList.isEmpty()) {
- return;
- }
-
- final PREPARE prepare = new PREPARE();
- final List<DELETE> deleteList = prepare.getDelete();
-
- for (String deletePath : deleteOutputPathList) {
- final DELETE delete = new DELETE();
- delete.setPath(deletePath);
- deleteList.add(delete);
- }
-
- if (!deleteList.isEmpty()) {
- pigAction.setPrepare(prepare);
- }
- }
-
- private void addPrepareDeleteOutputPath(Process process,
- org.apache.falcon.oozie.hive.ACTION hiveAction)
- throws FalconException {
-
- List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
- if (deleteOutputPathList.isEmpty()) {
- return;
- }
-
- org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
- List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
-
- for (String deletePath : deleteOutputPathList) {
- org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
- delete.setPath(deletePath);
- deleteList.add(delete);
- }
-
- if (!deleteList.isEmpty()) {
- hiveAction.setPrepare(prepare);
- }
- }
-
- private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
- final List<String> deleteList = new ArrayList<String>();
- if (process.getOutputs() == null) {
- return deleteList;
- }
-
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-
- if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
- continue; // prepare delete only applies to FileSystem storage
- }
-
- deleteList.add("${wf:conf('" + output.getName() + "')}");
- }
-
- return deleteList;
- }
-
- private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
- String engineType) throws FalconException {
- if (process.getInputs() == null) {
- return;
- }
-
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- final String inputName = input.getName();
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
- Map<String, String> props = new HashMap<String, String>();
- propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
- for (String key : props.keySet()) {
- paramList.add(key + "=${wf:conf('" + key + "')}");
- }
-
- paramList.add(paramName + "_filter=${wf:conf('"
- + paramName + "_partition_filter_" + engineType + "')}");
- }
- }
- }
-
- private void addOutputFeedsAsParams(List<String> paramList, Process process,
- Cluster cluster) throws FalconException {
- if (process.getOutputs() == null) {
- return;
- }
-
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- final String outputName = output.getName(); // no prefix for backwards compatibility
- paramList.add(outputName + "=${" + outputName + "}");
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- Map<String, String> props = new HashMap<String, String>();
- propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
- for (String key : props.keySet()) {
- paramList.add(key + "=${wf:conf('" + key + "')}");
- }
- }
- }
- }
-
- private void propagateProcessProperties(PIG pigAction, Process process) {
- org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
- if (processProperties == null) {
- return;
- }
-
- // Propagate user defined properties to job configuration
- final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
- pigAction.getConfiguration().getProperty();
-
- // Propagate user defined properties to pig script as macros
- // passed as parameters -p name=value that can be accessed as $name
- final List<String> paramList = pigAction.getParam();
-
- for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
- org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
- new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
- configProperty.setName(property.getName());
- configProperty.setValue(property.getValue());
- configuration.add(configProperty);
-
- paramList.add(property.getName() + "=" + property.getValue());
- }
- }
-
- private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
- org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
- if (processProperties == null) {
- return;
- }
-
- // Propagate user defined properties to job configuration
- final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
- hiveAction.getConfiguration().getProperty();
-
- // Propagate user defined properties to pig script as macros
- // passed as parameters -p name=value that can be accessed as $name
- final List<String> paramList = hiveAction.getParam();
-
- for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
- org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
- new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
- configProperty.setName(property.getName());
- configProperty.setValue(property.getValue());
- configuration.add(configProperty);
-
- paramList.add(property.getName() + "=" + property.getValue());
- }
- }
-
- private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
- Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
- if (process.getInputs() == null) {
- return storageType;
- }
-
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- storageType = FeedHelper.getStorageType(feed, cluster);
- if (Storage.TYPE.TABLE == storageType) {
- break;
- }
- }
-
- return storageType;
- }
-
- // creates hive-site.xml configuration in conf dir.
- private void setupHiveConfiguration(Cluster cluster, Path wfPath,
- String prefix) throws FalconException {
- String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
- Path confPath = new Path(wfPath, "conf");
- createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
- } catch (IOException e) {
- throw new FalconException(e);
- }
- }
-
- private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
- Path libPath) throws FalconException {
- if (libPath == null) {
- return;
- }
-
- try {
- final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
- if (fs.isFile(libPath)) { // File, not a Dir
- archiveList.add(libPath.toString());
- return;
- }
-
- // lib path is a directory, add each file under the lib dir to archive
- final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- try {
- return fs.isFile(path) && path.getName().endsWith(".jar");
- } catch (IOException ignore) {
- return false;
- }
- }
- });
-
- for (FileStatus fileStatus : fileStatuses) {
- archiveList.add(fileStatus.getPath().toString());
- }
- } catch (IOException e) {
- throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
- }
- }
-
- @SuppressWarnings("unchecked")
- protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
- try {
- Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
- unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
- return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
- unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to unmarshall hive action.", e);
- }
- }
-
- protected void marshalHiveAction(ACTION wfAction,
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
- try {
- DOMResult hiveActionDOM = new DOMResult();
- Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(actionjaxbElement, hiveActionDOM);
- wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to marshall hive action.", e);
- }
- }
-}