You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/10 08:57:32 UTC

[3/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/src-cluster.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/src-cluster.xml b/oozie/src/test/resources/feed/src-cluster.xml
new file mode 100644
index 0000000..730f8d2
--- /dev/null
+++ b/oozie/src/test/resources/feed/src-cluster.xml
@@ -0,0 +1,40 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="gs1" description="" name="corp1" xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:49093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/table-replication-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/table-replication-feed.xml b/oozie/src/test/resources/feed/table-replication-feed.xml
new file mode 100644
index 0000000..4c610f6
--- /dev/null
+++ b/oozie/src/test/resources/feed/table-replication-feed.xml
@@ -0,0 +1,42 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="raw-logs-table" xmlns="uri:falcon:feed:0.1">
+
+    <frequency>minutes(20)</frequency>
+    <timezone>UTC</timezone>
+
+    <clusters>
+        <cluster name="corp1" type="source" delay="minutes(40)">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(5)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="corp2" type="target">
+            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <retention limit="minutes(7)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:target_db:target_clicks_table#ds=${YEAR}${MONTH}${DAY};region=${region}" />
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:source_db:source_clicks_table#ds=${YEAR}${MONTH}${DAY};region=${region}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/trg-cluster-alpha.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/trg-cluster-alpha.xml b/oozie/src/test/resources/feed/trg-cluster-alpha.xml
new file mode 100644
index 0000000..1fb07cb
--- /dev/null
+++ b/oozie/src/test/resources/feed/trg-cluster-alpha.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="ua1" description="" name="alpha" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/trg-cluster-beta.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/trg-cluster-beta.xml b/oozie/src/test/resources/feed/trg-cluster-beta.xml
new file mode 100644
index 0000000..0bf0bcd
--- /dev/null
+++ b/oozie/src/test/resources/feed/trg-cluster-beta.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="ua2" description="" name="beta" xmlns="uri:falcon:cluster:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/test/resources/feed/trg-cluster.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/trg-cluster.xml b/oozie/src/test/resources/feed/trg-cluster.xml
new file mode 100644
index 0000000..8260fda
--- /dev/null
+++ b/oozie/src/test/resources/feed/trg-cluster.xml
@@ -0,0 +1,40 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<cluster colo="gs2" description="" name="corp2" xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="http://localhost:50070"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+                   version="3.1"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="thrift://localhost:59093" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+        <location name="staging" path="/projects/falcon/staging2"/>
+    </locations>
+    <properties>
+        <property name="separator" value="-"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7130352..25c498e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -425,9 +425,7 @@
         <module>test-util</module>
         <module>hadoop-dependencies</module>
         <module>hadoop-webapp</module>
-        <module>feed</module>
         <module>messaging</module>
-        <module>process</module>
         <module>oozie-el-extensions</module>
         <module>oozie</module>
         <module>acquisition</module>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java b/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
index a9c9643..b3a5524 100644
--- a/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
+++ b/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
@@ -48,6 +48,7 @@ public class EmbeddedServer {
     public void start() throws Exception {
         Services.get().reset();
         server.start();
+        server.join();
     }
 
     public void stop() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/process/pom.xml
----------------------------------------------------------------------
diff --git a/process/pom.xml b/process/pom.xml
deleted file mode 100644
index c1ee74d..0000000
--- a/process/pom.xml
+++ /dev/null
@@ -1,118 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.falcon</groupId>
-        <artifactId>falcon-main</artifactId>
-        <version>0.6-incubating-SNAPSHOT</version>
-    </parent>
-    <artifactId>falcon-process</artifactId>
-    <description>Apache Falcon Process Module</description>
-    <name>Apache Falcon Process</name>
-    <packaging>jar</packaging>
-
-    <profiles>
-        <profile>
-            <id>hadoop-1</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-core</artifactId>
-                </dependency>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-test</artifactId>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
-            <id>hadoop-2</id>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
-                </dependency>
-                <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                </dependency>
-                <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                       <classifier>tests</classifier>
-                 </dependency>
-                 <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-common</artifactId>
-                       <classifier>tests</classifier>
-                 </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
-
-    <dependencies>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-oozie-adaptor</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-common</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-feed</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-test-util</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.falcon</groupId>
-            <artifactId>falcon-messaging</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>xerces</groupId>
-            <artifactId>xercesImpl</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
deleted file mode 100644
index 3751f95..0000000
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ /dev/null
@@ -1,904 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.CONTROLS;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.DATAIN;
-import org.apache.falcon.oozie.coordinator.DATAOUT;
-import org.apache.falcon.oozie.coordinator.DATASETS;
-import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DELETE;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.PREPARE;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.falcon.util.OozieUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.client.OozieClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBElement;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Oozie workflow builder for falcon entities.
- */
-public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
-    private static final Logger LOG = LoggerFactory.getLogger(OozieProcessWorkflowBuilder.class);
-
-    private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
-            Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
-
-    public OozieProcessWorkflowBuilder(Process entity) {
-        super(entity);
-    }
-
-    @Override
-    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
-        Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
-
-        for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, clusterName);
-            if (processCluster.getValidity().getStart().compareTo(processCluster.getValidity().getEnd()) >= 0) {
-                LOG.info("process validity start <= end for cluster {}. Skipping schedule", clusterName);
-                break;
-            }
-
-            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
-            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
-            map(cluster, bundlePath);
-            Properties properties = createAppProperties(clusterName, bundlePath, CurrentUser.getUser());
-
-            //Add libpath
-            String libPath = entity.getWorkflow().getLib();
-            if (!StringUtils.isEmpty(libPath)) {
-                String path = libPath.replace("${nameNode}", "");
-                properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
-            }
-
-            if (entity.getInputs() != null) {
-                for (Input in : entity.getInputs().getInputs()) {
-                    if (in.isOptional()) {
-                        addOptionalInputProperties(properties, in, clusterName);
-                    }
-                }
-            }
-            propertiesMap.put(clusterName, properties);
-        }
-        return propertiesMap;
-    }
-
-    private void addOptionalInputProperties(Properties properties, Input in, String clusterName)
-        throws FalconException {
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
-        String inName = in.getName();
-        properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
-        properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
-        properties.put(inName + ".timezone", feed.getTimezone().getID());
-        properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
-        properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
-        properties.put(inName + ".done-flag", "notused");
-
-        String locPath = FeedHelper.createStorage(clusterName, feed)
-                .getUriTemplate(LocationType.DATA).replace('$', '%');
-        properties.put(inName + ".uri-template", locPath);
-
-        properties.put(inName + ".start-instance", in.getStart());
-        properties.put(inName + ".end-instance", in.getEnd());
-    }
-
-    private Timeunit mapToCoordTimeUnit(TimeUnit tu) {
-        switch (tu) {
-        case days:
-            return Timeunit.DAY;
-
-        case hours:
-            return Timeunit.HOUR;
-
-        case minutes:
-            return Timeunit.MINUTE;
-
-        case months:
-            return Timeunit.MONTH;
-
-        default:
-            throw new IllegalArgumentException("Unhandled time unit " + tu);
-        }
-    }
-
-    @Override
-    public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-        return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
-                process.getFrequency(), process.getTimezone(), now);
-    }
-
-    @Override
-    public String[] getWorkflowNames() {
-        return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString()};
-    }
-
-    private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
-    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-    @Override
-    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-
-            //Copy user workflow and lib to staging dir
-            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
-                new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
-            if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
-                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
-                    new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
-            }
-
-            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-
-        List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
-        apps.add(createDefaultCoordinator(cluster, bundlePath));
-
-        return apps;
-    }
-
-    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
-        try {
-            FSDataOutputStream stream = fs.create(path);
-            try {
-                for (Map.Entry<String, String> entry : checksums.entrySet()) {
-                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-                }
-            } finally {
-                stream.close();
-            }
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-    }
-
-    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            Path wfPath = new Path(entity.getWorkflow().getPath());
-            if (fs.isFile(wfPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get workflow path", e);
-        }
-    }
-
-    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            if (entity.getWorkflow().getLib() == null) {
-                return null;
-            }
-            Path libPath = new Path(entity.getWorkflow().getLib());
-
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get user lib path", e);
-        }
-    }
-
-    /**
-     * Creates default oozie coordinator.
-     *
-     * @param cluster    - Cluster for which the coordinator app need to be created
-     * @param bundlePath - bundle path
-     * @return COORDINATORAPP
-     * @throws FalconException on Error
-     */
-    public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-        if (entity == null) {
-            return null;
-        }
-
-        COORDINATORAPP coord = new COORDINATORAPP();
-        String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
-        Path coordPath = getCoordPath(bundlePath, coordName);
-
-        // coord attributes
-        initializeCoordAttributes(cluster, entity, coord, coordName);
-
-        CONTROLS controls = initializeControls(entity); // controls
-        coord.setControls(controls);
-
-        // Configuration
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordName);
-
-        initializeInputPaths(cluster, entity, coord, props); // inputs
-        initializeOutputPaths(cluster, entity, coord, props);  // outputs
-
-        Workflow processWorkflow = entity.getWorkflow();
-        propagateUserWorkflowProperties(processWorkflow, props, entity.getName());
-
-        // create parent wf
-        createWorkflow(cluster, entity, processWorkflow, coordName, coordPath);
-
-        WORKFLOW wf = new WORKFLOW();
-        wf.setAppPath(getStoragePath(coordPath.toString()));
-        wf.setConfiguration(getCoordConfig(props));
-
-        // set coord action to parent wf
-        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
-        action.setWorkflow(wf);
-        coord.setAction(action);
-
-        return coord;
-    }
-
-    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
-        coord.setName(coordName);
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-            ProcessHelper.getCluster(process, cluster.getName());
-        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
-        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
-        coord.setTimezone(process.getTimezone().getID());
-        coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
-    }
-
-    private CONTROLS initializeControls(Process process)
-        throws FalconException {
-        CONTROLS controls = new CONTROLS();
-        controls.setConcurrency(String.valueOf(process.getParallel()));
-        controls.setExecution(process.getOrder().name());
-
-        Frequency timeout = process.getTimeout();
-        long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
-        long timeoutInMillis;
-        if (timeout != null) {
-            timeoutInMillis = ExpressionHelper.get().
-                evaluate(process.getTimeout().toString(), Long.class);
-        } else {
-            timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-        }
-        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-
-        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
-            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-        }
-
-        return controls;
-    }
-
-    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-        Map<String, String> props) throws FalconException {
-        if (process.getInputs() == null) {
-            props.put("falconInputFeeds", "NONE");
-            props.put("falconInPaths", IGNORE);
-            return;
-        }
-
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        List<String> inputFeedStorageTypes = new ArrayList<String>();
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            if (!input.isOptional()) {
-                if (coord.getDatasets() == null) {
-                    coord.setDatasets(new DATASETS());
-                }
-                if (coord.getInputEvents() == null) {
-                    coord.setInputEvents(new INPUTEVENTS());
-                }
-
-                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
-                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
-                DATAIN datain = createDataIn(input);
-                coord.getInputEvents().getDataIn().add(datain);
-            }
-
-            String inputExpr = null;
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
-                props.put(input.getName(), inputExpr);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
-                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
-            }
-
-            inputFeeds.add(feed.getName());
-            inputPaths.add(inputExpr);
-            inputFeedStorageTypes.add(storage.getType().name());
-        }
-
-        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
-    }
-
-    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
-                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
-        // populate late data handler - should-record action
-        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
-        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-
-        // storage type for each corresponding feed sent as a param to LateDataHandler
-        // needed to compute usage based on storage type in LateDataHandler
-        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
-    }
-
-    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                       Map<String, String> props) throws FalconException {
-        if (process.getOutputs() == null) {
-            props.put(ARG.feedNames.getPropName(), "NONE");
-            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
-            return;
-        }
-
-        if (coord.getDatasets() == null) {
-            coord.setDatasets(new DATASETS());
-        }
-
-        if (coord.getOutputEvents() == null) {
-            coord.setOutputEvents(new OUTPUTEVENTS());
-        }
-
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
-            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
-            DATAOUT dataout = createDataOut(output);
-            coord.getOutputEvents().getDataOut().add(dataout);
-
-            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-            outputFeeds.add(feed.getName());
-            outputPaths.add(outputExpr);
-
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                props.put(output.getName(), outputExpr);
-
-                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
-            }
-        }
-
-        // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
-        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
-    }
-
-    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
-        String datasetName, LocationType locationType) throws FalconException {
-
-        SYNCDATASET syncdataset = new SYNCDATASET();
-        syncdataset.setName(datasetName);
-        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-        String uriTemplate = storage.getUriTemplate(locationType);
-        if (storage.getType() == Storage.TYPE.TABLE) {
-            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-        }
-        syncdataset.setUriTemplate(uriTemplate);
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-        syncdataset.setTimezone(feed.getTimezone().getID());
-
-        if (feed.getAvailabilityFlag() == null) {
-            syncdataset.setDoneFlag("");
-        } else {
-            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
-        }
-
-        return syncdataset;
-    }
-
-    private DATAOUT createDataOut(Output output) {
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(output.getName());
-        dataout.setDataset(output.getName());
-        dataout.setInstance(getELExpression(output.getInstance()));
-        return dataout;
-    }
-
-    private DATAIN createDataIn(Input input) {
-        DATAIN datain = new DATAIN();
-        datain.setName(input.getName());
-        datain.setDataset(input.getName());
-        datain.setStartInstance(getELExpression(input.getStart()));
-        datain.setEndInstance(getELExpression(input.getEnd()));
-        return datain;
-    }
-
-    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
-        Storage storage, Map<String, String> props)
-        throws FalconException {
-
-        // stats and meta paths
-        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
-    }
-
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
-        COORDINATORAPP coord, Map<String, String> props, Storage storage)
-        throws FalconException {
-
-        String name = output.getName();
-        String type = locType.name().toLowerCase();
-
-        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
-        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
-
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(name + type);
-        dataout.setDataset(name + type);
-        dataout.setInstance(getELExpression(output.getInstance()));
-
-        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
-        if (outputEvents == null) {
-            outputEvents = new OUTPUTEVENTS();
-            coord.setOutputEvents(outputEvents);
-        }
-        outputEvents.getDataOut().add(dataout);
-
-        String outputExpr = "${coord:dataOut('" + name + type + "')}";
-        props.put(name + "." + type, outputExpr);
-    }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
-    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
-        Map<String, String> props, String prefix) {
-        props.put(prefix + "_storage_type", tableStorage.getType().name());
-        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
-        props.put(prefix + "_database", tableStorage.getDatabase());
-        props.put(prefix + "_table", tableStorage.getTable());
-    }
-
-    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
-        Map<String, String> props) {
-        String prefix = "falcon_" + input.getName();
-
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
-        props.put(prefix + "_partition_filter_pig",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
-        props.put(prefix + "_partition_filter_hive",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
-        props.put(prefix + "_partition_filter_java",
-            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
-        props.put(prefix + "_datain_partitions_hive",
-            "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
-    }
-
-    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
-                                                 Map<String, String> props) {
-        String prefix = "falcon_" + output.getName();
-
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
-        props.put(prefix + "_dataout_partitions",
-            "${coord:dataOutPartitions('" + output.getName() + "')}");
-        props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
-            + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
-    }
-
-    private String join(Iterator<String> itr, char sep) {
-        String joinedStr = StringUtils.join(itr, sep);
-        if (joinedStr.isEmpty()) {
-            joinedStr = "null";
-        }
-        return joinedStr;
-    }
-
-    private String getELExpression(String expr) {
-        if (expr != null) {
-            expr = "${" + expr + "}";
-        }
-        return expr;
-    }
-
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Map<String, String> props = new HashMap<String, String>();
-        if (entity.getProperties() != null) {
-            for (Property prop : entity.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-
-    private void propagateUserWorkflowProperties(Workflow processWorkflow,
-        Map<String, String> props, String processName) {
-        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
-            processWorkflow.getName(), processName));
-        props.put("userWorkflowVersion", processWorkflow.getVersion());
-        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
-    }
-
-    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-                                  String wfName, Path parentWfPath) throws FalconException {
-        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
-        wfApp.setName(wfName);
-        try {
-            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
-        } catch (IOException e) {
-            throw new FalconException("Failed to add library extensions for the workflow", e);
-        }
-
-        final boolean shouldConfigureHive = shouldSetupHiveConfiguration(cluster, process);
-        if (shouldConfigureHive) {
-            setupHiveCredentials(cluster, parentWfPath, wfApp);
-        }
-
-        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
-        EngineType engineType = processWorkflow.getEngine();
-        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof ACTION)) {
-                continue;
-            }
-
-            ACTION action = (ACTION) object;
-            String actionName = action.getName();
-            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
-                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
-            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, action.getPig(), parentWfPath, shouldConfigureHive);
-            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
-                decorateHiveAction(cluster, process, action, parentWfPath);
-            } else if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-                if (shouldConfigureHive && actionName.equals("recordsize")) {
-                    // adds hive-site.xml in actions classpath
-                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
-                }
-            }
-        }
-
-        //Create parent workflow
-        marshal(cluster, wfApp, parentWfPath);
-    }
-
-    protected boolean shouldSetupHiveConfiguration(Cluster cluster,
-                                                   Process process) throws FalconException {
-        return isTableStorageType(cluster, entity)
-                || EngineType.HIVE == process.getWorkflow().getEngine();
-    }
-
-    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
-    private void setupHiveCredentials(Cluster cluster, Path parentWfPath,
-                                      WORKFLOWAPP wfApp) throws FalconException {
-        // create hive-site.xml file so actions can use it in the classpath
-        createHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
-
-        if (isSecurityEnabled) {
-            // add hcatalog credentials for secure mode and add a reference to each action
-            addHCatalogCredentials(wfApp, cluster, HIVE_CREDENTIAL_NAME, FALCON_PROCESS_HIVE_ACTIONS);
-        }
-    }
-
-    private void decoratePIGAction(Cluster cluster, Process process, PIG pigAction,
-                                   Path parentWfPath, boolean shouldConfigureHive) throws FalconException {
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        pigAction.setScript("${nameNode}" + userWfPath.toString());
-
-        addPrepareDeleteOutputPath(process, pigAction);
-
-        final List<String> paramList = pigAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
-        addOutputFeedsAsParams(paramList, process, cluster);
-
-        propagateProcessProperties(pigAction, process);
-
-        if (shouldConfigureHive) { // adds hive-site.xml in pig classpath
-            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
-        }
-
-        addArchiveForCustomJars(cluster, pigAction.getArchive(),
-            getUserLibPath(cluster, parentWfPath.getParent()));
-    }
-
-    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
-                                    Path parentWfPath) throws FalconException {
-
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(wfAction);
-        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        hiveAction.setScript("${nameNode}" + userWfPath.toString());
-
-        addPrepareDeleteOutputPath(process, hiveAction);
-
-        final List<String> paramList = hiveAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
-        addOutputFeedsAsParams(paramList, process, cluster);
-
-        propagateProcessProperties(hiveAction, process);
-
-        // adds hive-site.xml in hive classpath
-        hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
-
-        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
-            getUserLibPath(cluster, parentWfPath.getParent()));
-
-        OozieUtils.marshalHiveAction(wfAction, actionJaxbElement);
-    }
-
-    private void addPrepareDeleteOutputPath(Process process,
-                                            PIG pigAction) throws FalconException {
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-
-        final PREPARE prepare = new PREPARE();
-        final List<DELETE> deleteList = prepare.getDelete();
-
-        for (String deletePath : deleteOutputPathList) {
-            final DELETE delete = new DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-
-        if (!deleteList.isEmpty()) {
-            pigAction.setPrepare(prepare);
-        }
-    }
-
-    private void addPrepareDeleteOutputPath(Process process, org.apache.falcon.oozie.hive.ACTION hiveAction)
-        throws FalconException {
-
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-
-        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
-        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
-
-        for (String deletePath : deleteOutputPathList) {
-            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-
-        if (!deleteList.isEmpty()) {
-            hiveAction.setPrepare(prepare);
-        }
-    }
-
-    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
-        final List<String> deleteList = new ArrayList<String>();
-        if (process.getOutputs() == null) {
-            return deleteList;
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-
-            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-                continue; // prepare delete only applies to FileSystem storage
-            }
-
-            deleteList.add("${wf:conf('" + output.getName() + "')}");
-        }
-
-        return deleteList;
-    }
-
-    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
-                                       String engineType) throws FalconException {
-        if (process.getInputs() == null) {
-            return;
-        }
-
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            final String inputName = input.getName();
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-
-                paramList.add(paramName + "_filter=${wf:conf('"
-                    + paramName + "_partition_filter_" + engineType + "')}");
-            }
-        }
-    }
-
-    private void addOutputFeedsAsParams(List<String> paramList, Process process,
-                                        Cluster cluster) throws FalconException {
-        if (process.getOutputs() == null) {
-            return;
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                final String outputName = output.getName();  // no prefix for backwards compatibility
-                paramList.add(outputName + "=${" + outputName + "}");
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-            }
-        }
-    }
-
-    private void propagateProcessProperties(PIG pigAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
-            pigAction.getConfiguration().getProperty();
-
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = pigAction.getParam();
-
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
-                new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-
-    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
-            hiveAction.getConfiguration().getProperty();
-
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = hiveAction.getParam();
-
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
-                new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-
-    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
-        Path libPath) throws FalconException {
-        if (libPath == null) {
-            return;
-        }
-
-        try {
-            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {  // File, not a Dir
-                archiveList.add(libPath.toString());
-                return;
-            }
-
-            // lib path is a directory, add each file under the lib dir to archive
-            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    try {
-                        return fs.isFile(path) && path.getName().endsWith(".jar");
-                    } catch (IOException ignore) {
-                        return false;
-                    }
-                }
-            });
-
-            for (FileStatus fileStatus : fileStatuses) {
-                archiveList.add(fileStatus.getPath().toString());
-            }
-        } catch (IOException e) {
-            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
deleted file mode 100644
index 4a2495c..0000000
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ /dev/null
@@ -1,278 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-process-parent-workflow'>
-    <start to='should-record'/>
-    <decision name='should-record'>
-        <switch>
-            <case to="recordsize">
-                ${shouldRecord=="true"}
-            </case>
-            <default to="user-workflow"/>
-        </switch>
-    </decision>
-    <action name='recordsize'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <!-- HCatalog jars -->
-                <property>
-                    <name>oozie.action.sharelib.for.java</name>
-                    <value>hcatalog</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
-            <arg>-out</arg>
-            <arg>${logDir}/latedata/${nominalTime}</arg>
-            <arg>-paths</arg>
-            <arg>${falconInPaths}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputFeedStorageTypes</arg>
-            <arg>${falconInputFeedStorageTypes}</arg>
-            <capture-output/>
-        </java>
-        <ok to="user-workflow"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <decision name='user-workflow'>
-        <switch>
-            <case to="user-oozie-workflow">
-                ${userWorkflowEngine=="oozie"}
-            </case>
-            <case to="user-pig-job">
-                ${userWorkflowEngine=="pig"}
-            </case>
-            <case to="user-hive-job">
-                ${userWorkflowEngine=="hive"}
-            </case>
-            <default to="user-oozie-workflow"/>
-        </switch>
-    </decision>
-    <action name='user-pig-job'>
-        <pig>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-                <property>
-                    <name>oozie.action.sharelib.for.pig</name>
-                    <value>pig,hcatalog</value>
-                </property>
-            </configuration>
-            <script>#USER_WF_PATH#</script>
-        </pig>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name="user-hive-job">
-        <hive xmlns="uri:oozie:hive-action:0.2">
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <script>#USER_WF_PATH#</script>
-        </hive>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='user-oozie-workflow'>
-        <sub-workflow>
-            <app-path>#USER_WF_PATH#</app-path>
-            <propagate-configuration/>
-        </sub-workflow>
-        <ok to="succeeded-post-processing"/>
-        <error to="failed-post-processing"/>
-    </action>
-    <action name='succeeded-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>GENERATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-userWorkflowName</arg>
-            <arg>${userWorkflowName}</arg>
-            <arg>-userWorkflowVersion</arg>
-            <arg>${userWorkflowVersion}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <arg>-falconInputFeeds</arg>
-            <arg>${falconInputFeeds}</arg>
-            <arg>-falconInputPaths</arg>
-            <arg>${falconInPaths}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="end"/>
-        <error to="fail"/>
-    </action>
-    <action name='failed-post-processing'>
-        <java>
-            <job-tracker>${jobTracker}</job-tracker>
-            <name-node>${nameNode}</name-node>
-            <configuration>
-                <property>
-                    <name>mapred.job.queue.name</name>
-                    <value>${queueName}</value>
-                </property>
-                <property>
-                    <name>oozie.launcher.mapred.job.priority</name>
-                    <value>${jobPriority}</value>
-                </property>
-            </configuration>
-            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
-            <arg>-entityType</arg>
-            <arg>${entityType}</arg>
-            <arg>-entityName</arg>
-            <arg>${entityName}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
-            <arg>-operation</arg>
-            <arg>GENERATE</arg>
-            <arg>-workflowId</arg>
-            <arg>${wf:id()}</arg>
-            <arg>-runId</arg>
-            <arg>${wf:run()}</arg>
-            <arg>-status</arg>
-            <arg>FAILED</arg>
-            <arg>-timeStamp</arg>
-            <arg>${timeStamp}</arg>
-            <arg>-brokerImplClass</arg>
-            <arg>${wf:conf("broker.impl.class")}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${wf:conf("broker.url")}</arg>
-            <arg>-userBrokerImplClass</arg>
-            <arg>${userBrokerImplClass}</arg>
-            <arg>-userBrokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
-            <arg>-brokerTTL</arg>
-            <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-workflowEngineUrl</arg>
-            <arg>${workflowEngineUrl}</arg>
-            <arg>-subflowId</arg>
-            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
-            <arg>-userWorkflowEngine</arg>
-            <arg>${userWorkflowEngine}</arg>
-            <arg>-logDir</arg>
-            <arg>${logDir}/job-${nominalTime}/</arg>
-            <arg>-workflowUser</arg>
-            <arg>${wf:user()}</arg>
-            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
-        </java>
-        <ok to="fail"/>
-        <error to="fail"/>
-    </action>
-    <kill name="fail">
-        <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-        </message>
-    </kill>
-    <end name='end'/>
-</workflow-app>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java b/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
deleted file mode 100644
index 2c7ee8b..0000000
--- a/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-
-import javax.xml.bind.Unmarshaller;
-
-/**
- * Base for falcon unit tests involving configuration store.
- */
-public class AbstractTestBase {
-    private static final String PROCESS_XML = "/config/process/process-0.1.xml";
-    private static final String FEED_XML = "/config/feed/feed-0.1.xml";
-    private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
-    private static final String PIG_PROCESS_XML = "/config/process/pig-process-0.1.xml";
-
-    protected void storeEntity(EntityType type, String name, String resource) throws Exception {
-        Unmarshaller unmarshaller = type.getUnmarshaller();
-        ConfigurationStore store = ConfigurationStore.get();
-        store.remove(type, name);
-        switch (type) {
-        case CLUSTER:
-            Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(resource));
-            cluster.setName(name);
-            store.publish(type, cluster);
-            break;
-
-        case FEED:
-            Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(resource));
-            feed.setName(name);
-            store.publish(type, feed);
-            break;
-
-        case PROCESS:
-            Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(resource));
-            process.setName(name);
-            store.publish(type, process);
-            break;
-
-        default:
-        }
-    }
-
-    public void setup() throws Exception {
-        storeEntity(EntityType.CLUSTER, "corp", CLUSTER_XML);
-        storeEntity(EntityType.FEED, "clicks", FEED_XML);
-        storeEntity(EntityType.FEED, "impressions", FEED_XML);
-        storeEntity(EntityType.FEED, "clicksummary", FEED_XML);
-        storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
-        storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
-    }
-
-    public void cleanup() throws Exception {
-        ConfigurationStore store = ConfigurationStore.get();
-        store.remove(EntityType.PROCESS, "pig-process");
-        store.remove(EntityType.PROCESS, "clicksummary");
-        store.remove(EntityType.FEED, "clicksummary");
-        store.remove(EntityType.FEED, "impressions");
-        store.remove(EntityType.FEED, "clicks");
-        store.remove(EntityType.CLUSTER, "corp");
-    }
-}