You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/03/06 01:49:38 UTC

[2/6] git commit: FALCON-286 Capture information in process entity about the user workflow. Contributed by Venkatesh Seetharam

FALCON-286  Capture information in process entity about the user workflow. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/bb5e1449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/bb5e1449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/bb5e1449

Branch: refs/heads/master
Commit: bb5e144970cf0336be859e5576afd8908861e687
Parents: 42d04f3
Author: Venkatesh Seetharam <ve...@hortonworks.com>
Authored: Wed Mar 5 15:56:59 2014 -0800
Committer: Venkatesh Seetharam <ve...@hortonworks.com>
Committed: Wed Mar 5 15:56:59 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 client/src/main/resources/process-0.1.xsd       |  2 +
 .../org/apache/falcon/entity/ProcessHelper.java |  5 ++
 .../falcon/converter/OozieFeedMapper.java       | 22 ++++++-
 .../falcon/converter/OozieFeedMapperTest.java   |  9 ++-
 .../falcon/converter/OozieProcessMapper.java    | 30 +++++++++-
 .../converter/OozieProcessMapperTest.java       | 63 +++++++++++++++++++-
 .../resources/config/process/dumb-process.xml   | 40 +++++++++++++
 .../resources/config/process/process-0.1.xml    |  2 +-
 9 files changed, 166 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b536b8..5d7eb6f 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -222,6 +222,9 @@ Release Version: 0.4-incubating
 
     FALCON-198 Update LICENSE.txt to contain license information for all third-party libraries
 
+    FALCON-286 Capture information in process entity about the user workflow
+    (Venkatesh Seetharam)
+
   BUG FIXES
 
     FALCON-223: hive-exec bundles protobuf-2.4.1 which is incompatible with hadoop-2 requiring protobuf-2.5

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 701fa51..f98822d 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -283,6 +283,8 @@
     </xs:complexType>
 
     <xs:complexType name="workflow">
+        <xs:attribute type="xs:string" name="name" use="optional"/>
+        <xs:attribute type="xs:string" name="version" use="optional" default="1.0"/>
         <xs:attribute type="engine-type" name="engine" use="optional" default="oozie"/>
         <xs:attribute type="xs:string" name="path" use="required"/>
         <xs:attribute type="xs:string" name="lib" use="optional"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index dc5ae7a..46e7384 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 
@@ -36,4 +37,8 @@ public final class ProcessHelper {
         }
         return null;
     }
+
+    public static String getProcessWorkflowName(String workflowName, String processName) {
+        return StringUtils.isEmpty(workflowName) ? processName + "-workflow" : workflowName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index d6dee77..e589c02 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -46,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.BuildProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -189,6 +190,11 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             props.put(ARG.feedNames.getPropName(), feed.getName());
             props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
 
+            props.put("falconInputFeeds", feed.getName());
+            props.put("falconInPaths", "IGNORE");
+
+            propagateUserWorkflowProperties(props, "eviction");
+
             retentionWorkflow.setConfiguration(getCoordConfig(props));
             retentionAction.setWorkflow(retentionWorkflow);
 
@@ -432,6 +438,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                 }
 
                 propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
+                propagateUserWorkflowProperties(props, "replication");
 
                 replicationWF.setConfiguration(getCoordConfig(props));
                 replicationAction.setWorkflow(replicationWF);
@@ -547,7 +554,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
 
             // falcon post processing
             props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), instancePaths);
+            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
         }
     }
 
@@ -563,4 +570,17 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             }
         }
     }
+
+    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+        props.put("userWorkflowName", policy + "-policy");
+        props.put("userWorkflowEngine", "falcon");
+
+        String version;
+        try {
+            version = BuildProperties.get().getProperty("build.version");
+        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
+            version = "0.5";
+        }
+        props.put("userWorkflowVersion", version);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index a153462..671c53c 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -202,7 +202,12 @@ public class OozieFeedMapperTest {
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        // verify workflow params
+        Assert.assertEquals("replication-policy", props.get("userWorkflowName"));
+        Assert.assertEquals("0.5", props.get("userWorkflowVersion"));
+        Assert.assertEquals("falcon", props.get("userWorkflowEngine"));
 
         assertLibExtensions(coord, "replication");
         assertWorkflowRetries(coord);
@@ -434,7 +439,7 @@ public class OozieFeedMapperTest {
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
     }
 
     private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 6d0297e..e638961 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -190,7 +190,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         initializeOutputPaths(cluster, process, coord, props);  // outputs
 
         Workflow processWorkflow = process.getWorkflow();
-        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+        propagateUserWorkflowProperties(processWorkflow, props, process.getName());
 
         // create parent wf
         createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
@@ -247,6 +247,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
                                       Map<String, String> props) throws FalconException {
         if (process.getInputs() == null) {
+            props.put("falconInputFeeds", "NONE");
+            props.put("falconInPaths", "IGNORE");
             return;
         }
 
@@ -281,7 +283,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                 propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
             }
 
-            inputFeeds.add(input.getName());
+            inputFeeds.add(feed.getName());
             inputPaths.add(inputExpr);
             inputFeedStorageTypes.add(storage.getType().name());
         }
@@ -303,6 +305,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
                                        Map<String, String> props) throws FalconException {
         if (process.getOutputs() == null) {
+            props.put(ARG.feedNames.getPropName(), "NONE");
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
             return;
         }
 
@@ -327,7 +331,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             coord.getOutputEvents().getDataOut().add(dataout);
 
             String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-            outputFeeds.add(output.getName());
+            outputFeeds.add(feed.getName());
             outputPaths.add(outputExpr);
 
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
@@ -486,6 +490,14 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         return props;
     }
 
+    private void propagateUserWorkflowProperties(Workflow processWorkflow,
+                                                 Map<String, String> props, String processName) {
+        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+                processWorkflow.getName(), processName));
+        props.put("userWorkflowVersion", processWorkflow.getVersion());
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+    }
+
     protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
                                   String wfName, Path parentWfPath) throws FalconException {
         WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
@@ -615,6 +627,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
         final List<String> deleteList = new ArrayList<String>();
+        if (process.getOutputs() == null) {
+            return deleteList;
+        }
+
         for (Output output : process.getOutputs().getOutputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
 
@@ -630,6 +646,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
                                        String engineType) throws FalconException {
+        if (process.getInputs() == null) {
+            return;
+        }
+
         for (Input input : process.getInputs().getInputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -653,6 +673,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private void addOutputFeedsAsParams(List<String> paramList, Process process,
                                         Cluster cluster) throws FalconException {
+        if (process.getOutputs() == null) {
+            return;
+        }
+
         for (Output output : process.getOutputs().getOutputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index b4c059a..22bf9fe 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -38,6 +38,8 @@ import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.messaging.EntityInstanceMessage;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
@@ -63,7 +65,11 @@ import javax.xml.bind.Unmarshaller;
 import javax.xml.transform.stream.StreamSource;
 import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URL;
 import java.util.Collections;
 import java.util.HashMap;
@@ -332,12 +338,12 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         }
 
         // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getName());
+        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
 
         // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getName());
+        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
     }
 
@@ -374,6 +380,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testProcessWorkflowMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        Workflow processWorkflow = process.getWorkflow();
+        Assert.assertEquals("test", processWorkflow.getName());
+        Assert.assertEquals("1.0.0", processWorkflow.getVersion());
+    }
+
     @SuppressWarnings("unchecked")
     private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
@@ -496,5 +510,48 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
         ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
         ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
+        ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
+    }
+
+    @Test
+    public void testProcessWithNoInputsAndOutputs() throws Exception {
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+        URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
+        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+        OozieProcessMapper mapper = new OozieProcessMapper(processEntity);
+        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+        mapper.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String[] expected = {
+            EntityInstanceMessage.ARG.feedNames.getPropName(),
+            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
+            "falconInputFeeds",
+            "falconInPaths",
+            "userWorkflowName",
+            "userWorkflowVersion",
+            "userWorkflowEngine",
+        };
+
+        for (String property : expected) {
+            Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/resources/config/process/dumb-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/dumb-process.xml b/process/src/test/resources/config/process/dumb-process.xml
new file mode 100644
index 0000000..b71f089
--- /dev/null
+++ b/process/src/test/resources/config/process/dumb-process.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="dumb-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = none -->
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bb5e1449/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index 975d1a4..6148441 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -34,7 +34,7 @@
         <property name="mapred.job.priority" value="LOW"/>
     </properties>
 
-    <workflow engine="oozie" path="/user/guest/workflow"/>
+    <workflow name="test" version="1.0.0" engine="oozie" path="/user/guest/workflow"/>
 
     <retry policy="periodic" delay="hours(10)" attempts="3"/>