You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2014/03/18 12:41:08 UTC

[3/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/ b/oozie/src/main/java/org/apache/falcon/workflow/
index e5a01ca..990fdc5 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/
+++ b/oozie/src/main/java/org/apache/falcon/workflow/
@@ -18,22 +18,64 @@
 package org.apache.falcon.workflow;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.ObjectFactory;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.OozieClient;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
  * Base workflow builder for falcon entities.
@@ -44,6 +86,334 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
     private static final Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
     protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+    protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+    protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+    protected static final String MR_QUEUE_NAME = "queueName";
+    protected static final String MR_JOB_PRIORITY = "jobPriority";
+    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+        Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+    protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            return path.getName().startsWith("falcon");
+        }
+        @Override
+        public String getJarName(Path path) {
+            String name = path.getName();
+            if (name.endsWith(".jar")) {
+                name = name.substring(0, name.indexOf(".jar"));
+            }
+            return name;
+        }
+    };
+    protected OozieWorkflowBuilder(T entity) {
+        super(entity);
+    }
+    protected Path getCoordPath(Path bundlePath, String coordName) {
+        Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
+        return new Path(bundlePath,;
+    }
+    protected abstract Map<String, String> getEntityProperties();
+    public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
+        BUNDLEAPP bundleApp = new BUNDLEAPP();
+        bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
+        // all the properties are set prior to bundle and coordinators creation
+        List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
+        if (coordinators.size() == 0) {
+            return false;
+        }
+        for (COORDINATORAPP coordinatorapp : coordinators) {
+            Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
+            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
+                EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+            createLogsDir(cluster, coordPath);
+            COORDINATOR bundleCoord = new COORDINATOR();
+            bundleCoord.setName(coordinatorapp.getName());
+            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+            bundleApp.getCoordinator().add(bundleCoord);
+            copySharedLibs(cluster, coordPath);
+        }
+        marshal(cluster, bundleApp, bundlePath);
+        return true;
+    }
+    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs = null;
+        try {
+            libs = fs.listStatus(path);
+        } catch(FileNotFoundException ignore) {
+            //Ok if the libext is not configured
+        }
+        if (libs == null) {
+            return;
+        }
+        for(FileStatus lib : libs) {
+            if (lib.isDir()) {
+                continue;
+            }
+            for(Object obj: wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
+        throws IOException, FalconException {
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+        addExtensionJars(fs, new Path(libext), wf);
+        addExtensionJars(fs, new Path(libext,, wf);
+        if (StringUtils.isNotEmpty(lifecycle)) {
+            addExtensionJars(fs, new Path(libext, + "/" + lifecycle), wf);
+        }
+    }
+    private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
+        try {
+            Path libPath = new Path(coordPath, "lib");
+            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+                libPath, cluster, FALCON_JAR_FILTER);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+        }
+    }
+    public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
+    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
+        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
+            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+        List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
+        for (Entry<String, String> prop : propMap.entrySet()) {
+            props.add(createCoordProperty(prop.getKey(), prop.getValue()));
+        }
+        return conf;
+    }
+    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(ARG.entityName.getPropName(), entity.getName());
+        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
+        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
+        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
+            "tcp://localhost:61616?daemon=true");
+        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
+        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
+            ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+            DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
+        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+        props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
+        props.put(OozieClient.EXTERNAL_ID,
+            new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+                "${coord:nominalTime()}").getId());
+        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+        try {
+            if (EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+                props.put("shouldRecord", "false");
+            } else {
+                props.put("shouldRecord", "true");
+            }
+        } catch (FalconException e) {
+            LOG.error("Unable to get Late Process for entity:" + entity, e);
+            throw new FalconRuntimException(e);
+        }
+        props.put("entityName", entity.getName());
+        props.put("entityType", entity.getEntityType().name().toLowerCase());
+        props.put(ARG.cluster.getPropName(), cluster.getName());
+        if (cluster.getProperties() != null) {
+            for (Property prop : cluster.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+        //props in entity override the set props.
+        props.putAll(getEntityProperties());
+        return props;
+    }
+    protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
+        String value) {
+        org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+            = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+        prop.setName(name);
+        prop.setValue(value);
+        return prop;
+    }
+    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+        throws FalconException {
+        try {
+            Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            OutputStream out = fs.create(outPath);
+            try {
+                marshaller.marshal(jaxbElement, out);
+            } finally {
+                out.close();
+            }
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
+                LOG.debug(writer.getBuffer());
+            }
+  "Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
+        } catch (Exception e) {
+            throw new FalconException("Unable to marshall app object", e);
+        }
+    }
+    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            Path logsDir = new Path(coordPath, "../../logs");
+            fs.mkdirs(logsDir);
+            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fs.setPermission(logsDir, permission);
+        } catch (Exception e) {
+            throw new FalconException("Unable to create temp dir in " + coordPath, e);
+        }
+    }
+    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
+        if (StringUtils.isEmpty(name)) {
+            name = "coordinator";
+        }
+        name = name + ".xml";
+        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), OozieUtils.COORD_JAXB_CONTEXT,
+            new Path(outPath, name));
+        return name;
+    }
+    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
+        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+            OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+    }
+    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+            OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+    protected String getStoragePath(Path path) {
+        if (path != null) {
+            return getStoragePath(path.toString());
+        }
+        return null;
+    }
+    protected String getStoragePath(String path) {
+        if (StringUtils.isNotEmpty(path)) {
+            if (new Path(path).toUri().getScheme() == null) {
+                path = "${nameNode}" + path;
+            }
+        }
+        return path;
+    }
+    protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                resourceAsStream);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
+                unmarshaller.unmarshal(resourceAsStream);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+    protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
+        Cluster cluster, String prefix) throws IOException {
+        Configuration hiveConf = new Configuration(false);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
+        hiveConf.set("hive.metastore.local", "false");
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+        }
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+    protected void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
     protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
         Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
         Properties properties = new Properties();
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/ b/oozie/src/main/java/org/apache/falcon/workflow/engine/
index ac8862e..d819e93 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/
@@ -116,7 +116,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         if (!schedClusters.isEmpty()) {
             WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
-            Map<String, Properties> newFlows = builder.newWorkflowSchedule(entity, schedClusters);
+            Map<String, Properties> newFlows = builder.newWorkflowSchedule(schedClusters.toArray(new
+                String[schedClusters.size()]));
             for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
                 String cluster = entry.getKey();
       "Scheduling " + entity.toShortString() + " on cluster " + cluster);
@@ -380,7 +381,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             List<Instance> runInstances = new ArrayList<Instance>();
-            String[] wfNames = builder.getWorkflowNames(entity);
+            String[] wfNames = builder.getWorkflowNames();
             List<String> coordNames = new ArrayList<String>();
             for (String wfName : wfNames) {
                 if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
@@ -1059,11 +1060,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
         throws FalconException {
-        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
-        Properties bundleProps = builder.newWorkflowSchedule(entity, startDate, cluster, user);
+        Entity clone = entity.copy();
+        EntityUtil.setStartDate(entity, cluster, startDate);
+        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
+        Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);"Scheduling " + entity.toShortString() + " on cluster " + cluster + " with props " + bundleProps);
-        if (bundleProps != null) {
-            return scheduleEntity(cluster, bundleProps, entity);
+        if (bundleProps != null && bundleProps.size() > 0) {
+            return scheduleEntity(cluster, bundleProps.get(cluster), entity);
         } else {
   "No new workflow to be scheduled for this " + entity.toShortString());
             return null;
diff --git a/process/src/main/java/org/apache/falcon/converter/ b/process/src/main/java/org/apache/falcon/converter/
deleted file mode 100644
index e638961..0000000
--- a/process/src/main/java/org/apache/falcon/converter/
+++ /dev/null
@@ -1,833 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.CONTROLS;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.DATAIN;
-import org.apache.falcon.oozie.coordinator.DATAOUT;
-import org.apache.falcon.oozie.coordinator.DATASETS;
-import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DELETE;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.PREPARE;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.hadoop.fs.*;
-import org.apache.xerces.dom.ElementNSImpl;
-import org.w3c.dom.Document;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.dom.DOMResult;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
- * This class maps the Falcon entities into Oozie artifacts.
- */
-public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
-    private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
-    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-    public OozieProcessMapper(Process entity) {
-        super(entity);
-    }
-    @Override
-    protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            Process process = getEntity();
-            //Copy user workflow and lib to staging dir
-            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getPath()),
-                    new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
-            if (process.getWorkflow().getLib() != null && fs.exists(new Path(process.getWorkflow().getLib()))) {
-                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getLib()),
-                        new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
-            }
-            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-        List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
-        apps.add(createDefaultCoordinator(cluster, bundlePath));
-        return apps;
-    }
-    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
-        try {
-            FSDataOutputStream stream = fs.create(path);
-            try {
-                for (Map.Entry<String, String> entry : checksums.entrySet()) {
-                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-                }
-            } finally {
-                stream.close();
-            }
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-    }
-    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            Process process = getEntity();
-            Path wfPath = new Path(process.getWorkflow().getPath());
-            if (fs.isFile(wfPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get workflow path", e);
-        }
-    }
-    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            Process process = getEntity();
-            if (process.getWorkflow().getLib() == null) {
-                return null;
-            }
-            Path libPath = new Path(process.getWorkflow().getLib());
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get user lib path", e);
-        }
-    }
-    /**
-     * Creates default oozie coordinator.
-     *
-     * @param cluster    - Cluster for which the coordiantor app need to be created
-     * @param bundlePath - bundle path
-     * @return COORDINATORAPP
-     * @throws FalconException on Error
-     */
-    public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-        Process process = getEntity();
-        if (process == null) {
-            return null;
-        }
-        String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString();
-        Path coordPath = getCoordPath(bundlePath, coordName);
-        // coord attributes
-        initializeCoordAttributes(cluster, process, coord, coordName);
-        CONTROLS controls = initializeControls(process); // controls
-        coord.setControls(controls);
-        // Configuration
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
-        initializeInputPaths(cluster, process, coord, props); // inputs
-        initializeOutputPaths(cluster, process, coord, props);  // outputs
-        Workflow processWorkflow = process.getWorkflow();
-        propagateUserWorkflowProperties(processWorkflow, props, process.getName());
-        // create parent wf
-        createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
-        WORKFLOW wf = new WORKFLOW();
-        wf.setAppPath(getStoragePath(coordPath.toString()));
-        wf.setConfiguration(getCoordConfig(props));
-        // set coord action to parent wf
-        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
-        action.setWorkflow(wf);
-        coord.setAction(action);
-        return coord;
-    }
-    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
-        coord.setName(coordName);
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                ProcessHelper.getCluster(process, cluster.getName());
-        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
-        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
-        coord.setTimezone(process.getTimezone().getID());
-        coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
-    }
-    private CONTROLS initializeControls(Process process)
-        throws FalconException {
-        CONTROLS controls = new CONTROLS();
-        controls.setConcurrency(String.valueOf(process.getParallel()));
-        controls.setExecution(process.getOrder().name());
-        Frequency timeout = process.getTimeout();
-        long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
-        long timeoutInMillis;
-        if (timeout != null) {
-            timeoutInMillis = ExpressionHelper.get().
-                    evaluate(process.getTimeout().toString(), Long.class);
-        } else {
-            timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-        }
-        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
-            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-        }
-        return controls;
-    }
-    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                      Map<String, String> props) throws FalconException {
-        if (process.getInputs() == null) {
-            props.put("falconInputFeeds", "NONE");
-            props.put("falconInPaths", "IGNORE");
-            return;
-        }
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        List<String> inputFeedStorageTypes = new ArrayList<String>();
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (!input.isOptional()) {
-                if (coord.getDatasets() == null) {
-                    coord.setDatasets(new DATASETS());
-                }
-                if (coord.getInputEvents() == null) {
-                    coord.setInputEvents(new INPUTEVENTS());
-                }
-                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
-                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-                DATAIN datain = createDataIn(input);
-                coord.getInputEvents().getDataIn().add(datain);
-            }
-            String inputExpr = null;
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
-                props.put(input.getName(), inputExpr);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
-                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
-            }
-            inputFeeds.add(feed.getName());
-            inputPaths.add(inputExpr);
-            inputFeedStorageTypes.add(storage.getType().name());
-        }
-        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
-    }
-    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
-                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
-        // populate late data handler - should-record action
-        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
-        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-        // storage type for each corresponding feed sent as a param to LateDataHandler
-        // needed to compute usage based on storage type in LateDataHandler
-        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
-    }
-    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                       Map<String, String> props) throws FalconException {
-        if (process.getOutputs() == null) {
-            props.put(ARG.feedNames.getPropName(), "NONE");
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-            return;
-        }
-        if (coord.getDatasets() == null) {
-            coord.setDatasets(new DATASETS());
-        }
-        if (coord.getOutputEvents() == null) {
-            coord.setOutputEvents(new OUTPUTEVENTS());
-        }
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
-            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-            DATAOUT dataout = createDataOut(output);
-            coord.getOutputEvents().getDataOut().add(dataout);
-            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-            outputFeeds.add(feed.getName());
-            outputPaths.add(outputExpr);
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                props.put(output.getName(), outputExpr);
-                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
-            }
-        }
-        // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
-        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
-    }
-    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
-                                      String datasetName, LocationType locationType) throws FalconException {
-        SYNCDATASET syncdataset = new SYNCDATASET();
-        syncdataset.setName(datasetName);
-        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-        String uriTemplate = storage.getUriTemplate(locationType);
-        if (storage.getType() == Storage.TYPE.TABLE) {
-            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-        }
-        syncdataset.setUriTemplate(uriTemplate);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-        syncdataset.setTimezone(feed.getTimezone().getID());
-        if (feed.getAvailabilityFlag() == null) {
-            syncdataset.setDoneFlag("");
-        } else {
-            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
-        }
-        return syncdataset;
-    }
-    private DATAOUT createDataOut(Output output) {
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(output.getName());
-        dataout.setDataset(output.getName());
-        dataout.setInstance(getELExpression(output.getInstance()));
-        return dataout;
-    }
-    private DATAIN createDataIn(Input input) {
-        DATAIN datain = new DATAIN();
-        datain.setName(input.getName());
-        datain.setDataset(input.getName());
-        datain.setStartInstance(getELExpression(input.getStart()));
-        datain.setEndInstance(getELExpression(input.getEnd()));
-        return datain;
-    }
-    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
-                                               Storage storage, Map<String, String> props)
-        throws FalconException {
-        // stats and meta paths
-        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
-    }
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
-                                   COORDINATORAPP coord, Map<String, String> props, Storage storage)
-        throws FalconException {
-        String name = output.getName();
-        String type =;
-        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
-        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(name + type);
-        dataout.setDataset(name + type);
-        dataout.setInstance(getELExpression(output.getInstance()));
-        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
-        if (outputEvents == null) {
-            outputEvents = new OUTPUTEVENTS();
-            coord.setOutputEvents(outputEvents);
-        }
-        outputEvents.getDataOut().add(dataout);
-        String outputExpr = "${coord:dataOut('" + name + type + "')}";
-        props.put(name + "." + type, outputExpr);
-    }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
-                                                       Map<String, String> props, String prefix) {
-        props.put(prefix + "_storage_type", tableStorage.getType().name());
-        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
-        props.put(prefix + "_database", tableStorage.getDatabase());
-        props.put(prefix + "_table", tableStorage.getTable());
-    }
-    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
-                                                 Map<String, String> props) {
-        String prefix = "falcon_" + input.getName();
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-        props.put(prefix + "_partition_filter_pig",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
-        props.put(prefix + "_partition_filter_hive",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
-        props.put(prefix + "_partition_filter_java",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
-    }
-    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
-                                                 Map<String, String> props) {
-        String prefix = "falcon_" + output.getName();
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-        props.put(prefix + "_dataout_partitions",
-                "${coord:dataOutPartitions('" + output.getName() + "')}");
-        props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
-                + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
-    }
-    private String join(Iterator<String> itr, char sep) {
-        String joinedStr = StringUtils.join(itr, sep);
-        if (joinedStr.isEmpty()) {
-            joinedStr = "null";
-        }
-        return joinedStr;
-    }
-    private String getELExpression(String expr) {
-        if (expr != null) {
-            expr = "${" + expr + "}";
-        }
-        return expr;
-    }
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Process process = getEntity();
-        Map<String, String> props = new HashMap<String, String>();
-        if (process.getProperties() != null) {
-            for (Property prop : process.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-    private void propagateUserWorkflowProperties(Workflow processWorkflow,
-                                                 Map<String, String> props, String processName) {
-        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
-                processWorkflow.getName(), processName));
-        props.put("userWorkflowVersion", processWorkflow.getVersion());
-        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
-    }
-    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-                                  String wfName, Path parentWfPath) throws FalconException {
-        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
-        wfApp.setName(wfName);
-        try {
-            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
-        } catch (IOException e) {
-            throw new FalconException("Failed to add library extensions for the workflow", e);
-        }
-        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
-        EngineType engineType = processWorkflow.getEngine();
-        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) object;
-            String actionName = action.getName();
-            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
-                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
-            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
-            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
-                decorateHiveAction(cluster, process, action, parentWfPath);
-            } else if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-        //Create parent workflow
-        marshal(cluster, wfApp, parentWfPath);
-    }
-    private void decoratePIGAction(Cluster cluster, Process process,
-                                   PIG pigAction, Path parentWfPath) throws FalconException {
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        pigAction.setScript("${nameNode}" + userWfPath.toString());
-        addPrepareDeleteOutputPath(process, pigAction);
-        final List<String> paramList = pigAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster,;
-        addOutputFeedsAsParams(paramList, process, cluster);
-        propagateProcessProperties(pigAction, process);
-        Storage.TYPE storageType = getStorageType(cluster, process);
-        if (Storage.TYPE.TABLE == storageType) {
-            // adds hive-site.xml in pig classpath
-            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
-            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
-        }
-        addArchiveForCustomJars(cluster, pigAction.getArchive(),
-                getUserLibPath(cluster, parentWfPath.getParent()));
-    }
-    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
-                                    Path parentWfPath) throws FalconException {
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
-        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        hiveAction.setScript("${nameNode}" + userWfPath.toString());
-        addPrepareDeleteOutputPath(process, hiveAction);
-        final List<String> paramList = hiveAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster,;
-        addOutputFeedsAsParams(paramList, process, cluster);
-        propagateProcessProperties(hiveAction, process);
-        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
-        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
-                getUserLibPath(cluster, parentWfPath.getParent()));
-        marshalHiveAction(wfAction, actionJaxbElement);
-    }
-    private void addPrepareDeleteOutputPath(Process process,
-                                            PIG pigAction) throws FalconException {
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-        final PREPARE prepare = new PREPARE();
-        final List<DELETE> deleteList = prepare.getDelete();
-        for (String deletePath : deleteOutputPathList) {
-            final DELETE delete = new DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-        if (!deleteList.isEmpty()) {
-            pigAction.setPrepare(prepare);
-        }
-    }
-    private void addPrepareDeleteOutputPath(Process process,
-                                            org.apache.falcon.oozie.hive.ACTION hiveAction)
-        throws FalconException {
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
-        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
-        for (String deletePath : deleteOutputPathList) {
-            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-        if (!deleteList.isEmpty()) {
-            hiveAction.setPrepare(prepare);
-        }
-    }
-    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
-        final List<String> deleteList = new ArrayList<String>();
-        if (process.getOutputs() == null) {
-            return deleteList;
-        }
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-                continue; // prepare delete only applies to FileSystem storage
-            }
-            deleteList.add("${wf:conf('" + output.getName() + "')}");
-        }
-        return deleteList;
-    }
-    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
-                                       String engineType) throws FalconException {
-        if (process.getInputs() == null) {
-            return;
-        }
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-            final String inputName = input.getName();
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-                paramList.add(paramName + "_filter=${wf:conf('"
-                        + paramName + "_partition_filter_" + engineType + "')}");
-            }
-        }
-    }
-    private void addOutputFeedsAsParams(List<String> paramList, Process process,
-                                        Cluster cluster) throws FalconException {
-        if (process.getOutputs() == null) {
-            return;
-        }
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                final String outputName = output.getName();  // no prefix for backwards compatibility
-                paramList.add(outputName + "=${" + outputName + "}");
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-            }
-        }
-    }
-    private void propagateProcessProperties(PIG pigAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
-                pigAction.getConfiguration().getProperty();
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = pigAction.getParam();
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
-                    new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
-                hiveAction.getConfiguration().getProperty();
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = hiveAction.getParam();
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
-                    new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
-        if (process.getInputs() == null) {
-            return storageType;
-        }
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            storageType = FeedHelper.getStorageType(feed, cluster);
-            if (Storage.TYPE.TABLE == storageType) {
-                break;
-            }
-        }
-        return storageType;
-    }
-    // creates hive-site.xml configuration in conf dir.
-    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
-                                        String prefix) throws FalconException {
-        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            Path confPath = new Path(wfPath, "conf");
-            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
-                                         Path libPath) throws FalconException {
-        if (libPath == null) {
-            return;
-        }
-        try {
-            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {  // File, not a Dir
-                archiveList.add(libPath.toString());
-                return;
-            }
-            // lib path is a directory, add each file under the lib dir to archive
-            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    try {
-                        return fs.isFile(path) && path.getName().endsWith(".jar");
-                    } catch (IOException ignore) {
-                        return false;
-                    }
-                }
-            });
-            for (FileStatus fileStatus : fileStatuses) {
-                archiveList.add(fileStatus.getPath().toString());
-            }
-        } catch (IOException e) {
-            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
-        }
-    }
-    @SuppressWarnings("unchecked")
-    protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
-        try {
-            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
-            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
-            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
-                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to unmarshall hive action.", e);
-        }
-    }
-    protected void marshalHiveAction(ACTION wfAction,
-                                     JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
-        try {
-            DOMResult hiveActionDOM = new DOMResult();
-            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
-            marshaller.marshal(actionjaxbElement, hiveActionDOM);
-            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to marshall hive action.", e);
-        }
-    }