You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/21 20:24:57 UTC

git commit: FALCON-443 Process with Hive workflow engine and filesystem input feeds, table output feed fails. Contributed by Sowmya Ramesh

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 36d01791c -> 2129285dc


FALCON-443 Process with Hive workflow engine and filesystem input feeds, table output feed fails. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2129285d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2129285d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2129285d

Branch: refs/heads/master
Commit: 2129285dccb720f0f1f405b52474b9df61ee1708
Parents: 36d0179
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed May 21 11:24:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed May 21 11:24:48 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   9 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  14 +-
 .../workflow/OozieFeedWorkflowBuilder.java      |  10 +-
 .../falcon/workflow/OozieWorkflowBuilder.java   |  24 +--
 .../workflow/OozieProcessWorkflowBuilder.java   |  23 ++-
 .../OozieProcessWorkflowBuilderTest.java        | 171 +++++++++++++++++++
 .../config/process/dumb-hive-process.xml        |  39 +++++
 .../config/process/hive-process-FSInputFeed.xml |  46 +++++
 .../process/hive-process-FSOutputFeed.xml       |  46 +++++
 9 files changed, 349 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1efb4a4..fccc9b8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,11 +13,17 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-443 Process with Hive workflow engine and filesystem input feeds,
+   table output feed fails (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-382 Error While building Latest trunk code with Hadoop 2.2.0. (Suhas Vasu)
 
    FALCON-240 Instance status from CLI on a feed doesn't give the retention details. 
    (pavan kumar kolamuri via Shwetha GS)
 
+   FALCON-441 Lineage capture fails for feeds with multiple instances
+   (Venkatesh Seetharam)
+
 Release Version: 0.5-incubating
   INCOMPATIBLE CHANGES
     FALCON-11 Add support for security in Falcon (Venkatesh Seetharam)
@@ -148,9 +154,6 @@ Release Version: 0.5-incubating
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
-    FALCON-441 Lineage capture fails for feeds with multiple instances
-    (Venkatesh Seetharam)
-
     FALCON-440 Exclude IDEA IntelliJ and other unnecessary files from source
     distribution (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index a0a74e4..ece8982 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 
 /**
@@ -49,7 +50,7 @@ public final class ProcessHelper {
     public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster,
                                               Process process) throws FalconException {
         Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
-        if (process.getInputs() == null) {
+        if (process.getInputs() == null && process.getOutputs() == null) {
             return storageType;
         }
 
@@ -61,6 +62,17 @@ public final class ProcessHelper {
             }
         }
 
+        // If input feeds storage type is file system check storage type of output feeds
+        if (Storage.TYPE.FILESYSTEM == storageType) {
+            for (Output output : process.getOutputs().getOutputs()) {
+                Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+                storageType = FeedHelper.getStorageType(feed, cluster);
+                if (Storage.TYPE.TABLE == storageType) {
+                    break;
+                }
+            }
+        }
+
         return storageType;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 16bff02..faaddac 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -246,7 +246,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
                 addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
                 addOozieRetries(retWfApp);
 
-                if (isTableStorageType(cluster, entity)) {
+                if (shouldSetupHiveConfiguration(cluster, entity)) {
                     setupHiveCredentials(cluster, wfPath, retWfApp);
                 }
 
@@ -325,7 +325,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
 
             addOozieRetries(repWFapp);
 
-            if (isTableStorageType(targetCluster, entity)) {
+            if (shouldSetupHiveConfiguration(targetCluster, entity)) {
                 setupHiveCredentials(targetCluster, sourceCluster, repWFapp);
             }
 
@@ -719,4 +719,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         }
         props.put("userWorkflowVersion", version);
     }
+
+    protected boolean shouldSetupHiveConfiguration(Cluster cluster,
+                                                   Feed feed) throws FalconException {
+        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+        return Storage.TYPE.TABLE == storageType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index 7616df1..ad1af73 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -26,16 +26,11 @@ import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -550,21 +545,8 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     }
 
-    private boolean isTableStorageType(Cluster cluster, T entity) throws FalconException {
-        return entity.getEntityType() == EntityType.PROCESS
-                ? isTableStorageType(cluster, (Process) entity)
-                : isTableStorageType(cluster, (Feed) entity);
-    }
-
-    protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
-        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
-    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
-        return Storage.TYPE.TABLE == storageType;
-    }
+    protected abstract boolean shouldSetupHiveConfiguration(Cluster cluster,
+                                                            T entity) throws FalconException;
 
     protected void decorateWithOozieRetries(ACTION action) {
         Properties props = RuntimeProperties.get();
@@ -588,7 +570,7 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
         properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
 
-        if (isTableStorageType(cluster, entity)) {
+        if (shouldSetupHiveConfiguration(cluster, entity)) {
             propagateHiveCredentials(cluster, properties);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3d6bf7b..5089779 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -603,8 +603,8 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
             throw new FalconException("Failed to add library extensions for the workflow", e);
         }
 
-        final boolean isTableStorageType = isTableStorageType(cluster, process);
-        if (isTableStorageType) {
+        final boolean shouldConfigureHive = shouldSetupHiveConfiguration(cluster, process);
+        if (shouldConfigureHive) {
             setupHiveCredentials(cluster, parentWfPath, wfApp);
         }
 
@@ -620,12 +620,12 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
             if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
                 action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, action.getPig(), parentWfPath, isTableStorageType);
+                decoratePIGAction(cluster, process, action.getPig(), parentWfPath, shouldConfigureHive);
             } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
                 decorateHiveAction(cluster, process, action, parentWfPath);
             } else if (FALCON_ACTIONS.contains(actionName)) {
                 decorateWithOozieRetries(action);
-                if (isTableStorageType && actionName.equals("recordsize")) {
+                if (shouldConfigureHive && actionName.equals("recordsize")) {
                     // adds hive-site.xml in actions classpath
                     action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
                 }
@@ -636,6 +636,17 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         marshal(cluster, wfApp, parentWfPath);
     }
 
+    protected boolean shouldSetupHiveConfiguration(Cluster cluster,
+                                                   Process process) throws FalconException {
+        return isTableStorageType(cluster, entity)
+                || EngineType.HIVE == process.getWorkflow().getEngine();
+    }
+
+    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
     private void setupHiveCredentials(Cluster cluster, Path parentWfPath,
                                       WORKFLOWAPP wfApp) throws FalconException {
         // create hive-site.xml file so actions can use it in the classpath
@@ -648,7 +659,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void decoratePIGAction(Cluster cluster, Process process, PIG pigAction,
-                                   Path parentWfPath, boolean isTableStorageType) throws FalconException {
+                                   Path parentWfPath, boolean shouldConfigureHive) throws FalconException {
         Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
         pigAction.setScript("${nameNode}" + userWfPath.toString());
 
@@ -660,7 +671,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
 
         propagateProcessProperties(pigAction, process);
 
-        if (isTableStorageType) { // adds hive-site.xml in pig classpath
+        if (shouldConfigureHive) { // adds hive-site.xml in pig classpath
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 54c1809..5f0efe7 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -317,6 +317,177 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
     }
 
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process-FSInputFeed.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(7, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapperWithTableInputFeedAndFSOutputFeed(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+
+        resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNotNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(6, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessWithNoInputsAndOutputs(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        URL resource = this.getClass().getResource("/config/process/dumb-hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertTrue(hiveAction.getParam().isEmpty());
+
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
     private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
         Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
         Assert.assertTrue(fs.exists(hiveConfPath));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/dumb-hive-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/dumb-hive-process.xml b/process/src/test/resources/config/process/dumb-hive-process.xml
new file mode 100644
index 0000000..c504074
--- /dev/null
+++ b/process/src/test/resources/config/process/dumb-hive-process.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = none -->
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process-FSInputFeed.xml b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
new file mode 100644
index 0000000..d871377
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
new file mode 100644
index 0000000..23d96c3
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>