You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/03/06 01:49:38 UTC
[2/6] git commit: FALCON-286 Capture information in process entity
about the user workflow. Contributed by Venkatesh Seetharam
FALCON-286 Capture information in process entity about the user workflow. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/bb5e1449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/bb5e1449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/bb5e1449
Branch: refs/heads/master
Commit: bb5e144970cf0336be859e5576afd8908861e687
Parents: 42d04f3
Author: Venkatesh Seetharam <ve...@hortonworks.com>
Authored: Wed Mar 5 15:56:59 2014 -0800
Committer: Venkatesh Seetharam <ve...@hortonworks.com>
Committed: Wed Mar 5 15:56:59 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 3 +
client/src/main/resources/process-0.1.xsd | 2 +
.../org/apache/falcon/entity/ProcessHelper.java | 5 ++
.../falcon/converter/OozieFeedMapper.java | 22 ++++++-
.../falcon/converter/OozieFeedMapperTest.java | 9 ++-
.../falcon/converter/OozieProcessMapper.java | 30 +++++++++-
.../converter/OozieProcessMapperTest.java | 63 +++++++++++++++++++-
.../resources/config/process/dumb-process.xml | 40 +++++++++++++
.../resources/config/process/process-0.1.xml | 2 +-
9 files changed, 166 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b536b8..5d7eb6f 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -222,6 +222,9 @@ Release Version: 0.4-incubating
FALCON-198 Update LICENSE.txt to contain license information for all third-party libraries
+ FALCON-286 Capture information in process entity about the user workflow
+ (Venkatesh Seetharam)
+
BUG FIXES
FALCON-223: hive-exec bundles protobuf-2.4.1 which is incompatible with hadoop-2 requiring protobuf-2.5
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 701fa51..f98822d 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -283,6 +283,8 @@
</xs:complexType>
<xs:complexType name="workflow">
+ <xs:attribute type="xs:string" name="name" use="optional"/>
+ <xs:attribute type="xs:string" name="version" use="optional" default="1.0"/>
<xs:attribute type="engine-type" name="engine" use="optional" default="oozie"/>
<xs:attribute type="xs:string" name="path" use="required"/>
<xs:attribute type="xs:string" name="lib" use="optional"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index dc5ae7a..46e7384 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -18,6 +18,7 @@
package org.apache.falcon.entity;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Process;
@@ -36,4 +37,8 @@ public final class ProcessHelper {
}
return null;
}
+
+ public static String getProcessWorkflowName(String workflowName, String processName) {
+ return StringUtils.isEmpty(workflowName) ? processName + "-workflow" : workflowName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index d6dee77..e589c02 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -46,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.BuildProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -189,6 +190,11 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
props.put(ARG.feedNames.getPropName(), feed.getName());
props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+ props.put("falconInputFeeds", feed.getName());
+ props.put("falconInPaths", "IGNORE");
+
+ propagateUserWorkflowProperties(props, "eviction");
+
retentionWorkflow.setConfiguration(getCoordConfig(props));
retentionAction.setWorkflow(retentionWorkflow);
@@ -432,6 +438,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
+ propagateUserWorkflowProperties(props, "replication");
replicationWF.setConfiguration(getCoordConfig(props));
replicationAction.setWorkflow(replicationWF);
@@ -547,7 +554,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
// falcon post processing
props.put(ARG.feedNames.getPropName(), feed.getName());
- props.put(ARG.feedInstancePaths.getPropName(), instancePaths);
+ props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
}
}
@@ -563,4 +570,17 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
}
}
+
+ private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+ props.put("userWorkflowName", policy + "-policy");
+ props.put("userWorkflowEngine", "falcon");
+
+ String version;
+ try {
+ version = BuildProperties.get().getProperty("build.version");
+ } catch (Exception e) { // unfortunate that this is only available in prism/webapp
+ version = "0.5";
+ }
+ props.put("userWorkflowVersion", version);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index a153462..671c53c 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -202,7 +202,12 @@ public class OozieFeedMapperTest {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+ // verify workflow params
+ Assert.assertEquals("replication-policy", props.get("userWorkflowName"));
+ Assert.assertEquals("0.5", props.get("userWorkflowVersion"));
+ Assert.assertEquals("falcon", props.get("userWorkflowEngine"));
assertLibExtensions(coord, "replication");
assertWorkflowRetries(coord);
@@ -434,7 +439,7 @@ public class OozieFeedMapperTest {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
}
private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 6d0297e..e638961 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -190,7 +190,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
initializeOutputPaths(cluster, process, coord, props); // outputs
Workflow processWorkflow = process.getWorkflow();
- props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+ propagateUserWorkflowProperties(processWorkflow, props, process.getName());
// create parent wf
createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
@@ -247,6 +247,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
Map<String, String> props) throws FalconException {
if (process.getInputs() == null) {
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "IGNORE");
return;
}
@@ -281,7 +283,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
}
- inputFeeds.add(input.getName());
+ inputFeeds.add(feed.getName());
inputPaths.add(inputExpr);
inputFeedStorageTypes.add(storage.getType().name());
}
@@ -303,6 +305,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
Map<String, String> props) throws FalconException {
if (process.getOutputs() == null) {
+ props.put(ARG.feedNames.getPropName(), "NONE");
+ props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
return;
}
@@ -327,7 +331,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
coord.getOutputEvents().getDataOut().add(dataout);
String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
- outputFeeds.add(output.getName());
+ outputFeeds.add(feed.getName());
outputPaths.add(outputExpr);
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
@@ -486,6 +490,14 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
return props;
}
+ private void propagateUserWorkflowProperties(Workflow processWorkflow,
+ Map<String, String> props, String processName) {
+ props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+ processWorkflow.getName(), processName));
+ props.put("userWorkflowVersion", processWorkflow.getVersion());
+ props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+ }
+
protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
String wfName, Path parentWfPath) throws FalconException {
WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
@@ -615,6 +627,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
final List<String> deleteList = new ArrayList<String>();
+ if (process.getOutputs() == null) {
+ return deleteList;
+ }
+
for (Output output : process.getOutputs().getOutputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
@@ -630,6 +646,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
String engineType) throws FalconException {
+ if (process.getInputs() == null) {
+ return;
+ }
+
for (Input input : process.getInputs().getInputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -653,6 +673,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
private void addOutputFeedsAsParams(List<String> paramList, Process process,
Cluster cluster) throws FalconException {
+ if (process.getOutputs() == null) {
+ return;
+ }
+
for (Output output : process.getOutputs().getOutputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index b4c059a..22bf9fe 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -38,6 +38,8 @@ import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.messaging.EntityInstanceMessage;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -63,7 +65,11 @@ import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
@@ -332,12 +338,12 @@ public class OozieProcessMapperTest extends AbstractTestBase {
}
// verify the late data params
- Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getName());
+ Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
// verify the post processing params
- Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getName());
+ Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
}
@@ -374,6 +380,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
}
}
+ @Test
+ public void testProcessWorkflowMapper() throws Exception {
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+ Workflow processWorkflow = process.getWorkflow();
+ Assert.assertEquals("test", processWorkflow.getName());
+ Assert.assertEquals("1.0.0", processWorkflow.getVersion());
+ }
+
@SuppressWarnings("unchecked")
private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
@@ -496,5 +510,48 @@ public class OozieProcessMapperTest extends AbstractTestBase {
ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
+ ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
+ }
+
+ @Test
+ public void testProcessWithNoInputsAndOutputs() throws Exception {
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+ URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
+ Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+ OozieProcessMapper mapper = new OozieProcessMapper(processEntity);
+ Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+ mapper.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String[] expected = {
+ EntityInstanceMessage.ARG.feedNames.getPropName(),
+ EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
+ "falconInputFeeds",
+ "falconInPaths",
+ "userWorkflowName",
+ "userWorkflowVersion",
+ "userWorkflowEngine",
+ };
+
+ for (String property : expected) {
+ Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/resources/config/process/dumb-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/dumb-process.xml b/process/src/test/resources/config/process/dumb-process.xml
new file mode 100644
index 0000000..b71f089
--- /dev/null
+++ b/process/src/test/resources/config/process/dumb-process.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="dumb-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what = none -->
+
+ <!-- how -->
+ <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+ <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index 975d1a4..6148441 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -34,7 +34,7 @@
<property name="mapred.job.priority" value="LOW"/>
</properties>
- <workflow engine="oozie" path="/user/guest/workflow"/>
+ <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow"/>
<retry policy="periodic" delay="hours(10)" attempts="3"/>