You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/21 20:24:57 UTC
git commit: FALCON-443 Process with Hive workflow engine and
filesystem input feeds, table output feed fails. Contributed by Sowmya Ramesh
Repository: incubator-falcon
Updated Branches:
refs/heads/master 36d01791c -> 2129285dc
FALCON-443 Process with Hive workflow engine and filesystem input feeds, table output feed fails. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2129285d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2129285d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2129285d
Branch: refs/heads/master
Commit: 2129285dccb720f0f1f405b52474b9df61ee1708
Parents: 36d0179
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed May 21 11:24:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed May 21 11:24:48 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 9 +-
.../org/apache/falcon/entity/ProcessHelper.java | 14 +-
.../workflow/OozieFeedWorkflowBuilder.java | 10 +-
.../falcon/workflow/OozieWorkflowBuilder.java | 24 +--
.../workflow/OozieProcessWorkflowBuilder.java | 23 ++-
.../OozieProcessWorkflowBuilderTest.java | 171 +++++++++++++++++++
.../config/process/dumb-hive-process.xml | 39 +++++
.../config/process/hive-process-FSInputFeed.xml | 46 +++++
.../process/hive-process-FSOutputFeed.xml | 46 +++++
9 files changed, 349 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1efb4a4..fccc9b8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,11 +13,17 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-443 Process with Hive workflow engine and filesystem input feeds,
+ table output feed fails (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-382 Error While building Latest trunk code with Hadoop 2.2.0. (Suhas Vasu)
FALCON-240 Instance status from CLI on a feed doesn't give the retention details.
(pavan kumar kolamuri via Shwetha GS)
+ FALCON-441 Lineage capture fails for feeds with multiple instances
+ (Venkatesh Seetharam)
+
Release Version: 0.5-incubating
INCOMPATIBLE CHANGES
FALCON-11 Add support for security in Falcon (Venkatesh Seetharam)
@@ -148,9 +154,6 @@ Release Version: 0.5-incubating
FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
- FALCON-441 Lineage capture fails for feeds with multiple instances
- (Venkatesh Seetharam)
-
FALCON-440 Exclude IDEA IntelliJ and other unnecessary files from source
distribution (Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index a0a74e4..ece8982 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
/**
@@ -49,7 +50,7 @@ public final class ProcessHelper {
public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster,
Process process) throws FalconException {
Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
- if (process.getInputs() == null) {
+ if (process.getInputs() == null && process.getOutputs() == null) {
return storageType;
}
@@ -61,6 +62,17 @@ public final class ProcessHelper {
}
}
+ // If input feeds storage type is file system check storage type of output feeds
+ if (Storage.TYPE.FILESYSTEM == storageType) {
+ for (Output output : process.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ storageType = FeedHelper.getStorageType(feed, cluster);
+ if (Storage.TYPE.TABLE == storageType) {
+ break;
+ }
+ }
+ }
+
return storageType;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 16bff02..faaddac 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -246,7 +246,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
addOozieRetries(retWfApp);
- if (isTableStorageType(cluster, entity)) {
+ if (shouldSetupHiveConfiguration(cluster, entity)) {
setupHiveCredentials(cluster, wfPath, retWfApp);
}
@@ -325,7 +325,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
addOozieRetries(repWFapp);
- if (isTableStorageType(targetCluster, entity)) {
+ if (shouldSetupHiveConfiguration(targetCluster, entity)) {
setupHiveCredentials(targetCluster, sourceCluster, repWFapp);
}
@@ -719,4 +719,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
}
props.put("userWorkflowVersion", version);
}
+
+ protected boolean shouldSetupHiveConfiguration(Cluster cluster,
+ Feed feed) throws FalconException {
+ Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+ return Storage.TYPE.TABLE == storageType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index 7616df1..ad1af73 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -26,16 +26,11 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -550,21 +545,8 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
}
}
- private boolean isTableStorageType(Cluster cluster, T entity) throws FalconException {
- return entity.getEntityType() == EntityType.PROCESS
- ? isTableStorageType(cluster, (Process) entity)
- : isTableStorageType(cluster, (Feed) entity);
- }
-
- protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
- Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
- return Storage.TYPE.TABLE == storageType;
- }
-
- protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
- Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
- return Storage.TYPE.TABLE == storageType;
- }
+ protected abstract boolean shouldSetupHiveConfiguration(Cluster cluster,
+ T entity) throws FalconException;
protected void decorateWithOozieRetries(ACTION action) {
Properties props = RuntimeProperties.get();
@@ -588,7 +570,7 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
- if (isTableStorageType(cluster, entity)) {
+ if (shouldSetupHiveConfiguration(cluster, entity)) {
propagateHiveCredentials(cluster, properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3d6bf7b..5089779 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -603,8 +603,8 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
throw new FalconException("Failed to add library extensions for the workflow", e);
}
- final boolean isTableStorageType = isTableStorageType(cluster, process);
- if (isTableStorageType) {
+ final boolean shouldConfigureHive = shouldSetupHiveConfiguration(cluster, process);
+ if (shouldConfigureHive) {
setupHiveCredentials(cluster, parentWfPath, wfApp);
}
@@ -620,12 +620,12 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
} else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
- decoratePIGAction(cluster, process, action.getPig(), parentWfPath, isTableStorageType);
+ decoratePIGAction(cluster, process, action.getPig(), parentWfPath, shouldConfigureHive);
} else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
decorateHiveAction(cluster, process, action, parentWfPath);
} else if (FALCON_ACTIONS.contains(actionName)) {
decorateWithOozieRetries(action);
- if (isTableStorageType && actionName.equals("recordsize")) {
+ if (shouldConfigureHive && actionName.equals("recordsize")) {
// adds hive-site.xml in actions classpath
action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
}
@@ -636,6 +636,17 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
marshal(cluster, wfApp, parentWfPath);
}
+ protected boolean shouldSetupHiveConfiguration(Cluster cluster,
+ Process process) throws FalconException {
+ return isTableStorageType(cluster, entity)
+ || EngineType.HIVE == process.getWorkflow().getEngine();
+ }
+
+ protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+ Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+ return Storage.TYPE.TABLE == storageType;
+ }
+
private void setupHiveCredentials(Cluster cluster, Path parentWfPath,
WORKFLOWAPP wfApp) throws FalconException {
// create hive-site.xml file so actions can use it in the classpath
@@ -648,7 +659,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
}
private void decoratePIGAction(Cluster cluster, Process process, PIG pigAction,
- Path parentWfPath, boolean isTableStorageType) throws FalconException {
+ Path parentWfPath, boolean shouldConfigureHive) throws FalconException {
Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
pigAction.setScript("${nameNode}" + userWfPath.toString());
@@ -660,7 +671,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
propagateProcessProperties(pigAction, process);
- if (isTableStorageType) { // adds hive-site.xml in pig classpath
+ if (shouldConfigureHive) { // adds hive-site.xml in pig classpath
pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 54c1809..5f0efe7 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -317,6 +317,177 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
+ @Test (dataProvider = "secureOptions")
+ public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+ URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+
+ resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+ resource = this.getClass().getResource("/config/process/hive-process-FSInputFeed.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ prepare(process);
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+ Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Assert.assertEquals(hiveAction.getScript(),
+ "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+ Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+ Assert.assertNull(hiveAction.getPrepare());
+ Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+ Assert.assertFalse(hiveAction.getParam().isEmpty());
+ Assert.assertEquals(7, hiveAction.getParam().size());
+
+ Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+ assertHCatCredentials(parentWorkflow, wfPath);
+
+ ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+ }
+
+ @Test (dataProvider = "secureOptions")
+ public void testHiveProcessMapperWithTableInputFeedAndFSOutputFeed(String secureOption) throws Exception {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+ resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+
+ resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ prepare(process);
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+ Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Assert.assertEquals(hiveAction.getScript(),
+ "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+ Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+ Assert.assertNotNull(hiveAction.getPrepare());
+ Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+ Assert.assertFalse(hiveAction.getParam().isEmpty());
+ Assert.assertEquals(6, hiveAction.getParam().size());
+
+ Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+ assertHCatCredentials(parentWorkflow, wfPath);
+
+ ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+ }
+
+ @Test (dataProvider = "secureOptions")
+ public void testHiveProcessWithNoInputsAndOutputs(String secureOption) throws Exception {
+ StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+ URL resource = this.getClass().getResource("/config/process/dumb-hive-process.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ prepare(process);
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+ Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Assert.assertEquals(hiveAction.getScript(),
+ "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+ Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+ Assert.assertNull(hiveAction.getPrepare());
+ Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+ Assert.assertTrue(hiveAction.getParam().isEmpty());
+
+ assertHCatCredentials(parentWorkflow, wfPath);
+
+ ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+ }
+
private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
Assert.assertTrue(fs.exists(hiveConfPath));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/dumb-hive-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/dumb-hive-process.xml b/process/src/test/resources/config/process/dumb-hive-process.xml
new file mode 100644
index 0000000..c504074
--- /dev/null
+++ b/process/src/test/resources/config/process/dumb-hive-process.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what = none -->
+
+ <!-- how -->
+ <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process-FSInputFeed.xml b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
new file mode 100644
index 0000000..d871377
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what -->
+ <inputs>
+ <input name="input" feed="clicks" start="yesterday(0,0)" end="yesterday(20,0)"/>
+ </inputs>
+
+ <outputs>
+ <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
new file mode 100644
index 0000000..23d96c3
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what -->
+ <inputs>
+ <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+ </inputs>
+
+ <outputs>
+ <output name="output" feed="clicks" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>