You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/19 01:10:30 UTC

git commit: FALCON-614 Add pipeline element to process entity. Contributed by Balu Vellanki

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 3fc1e8dbb -> 69e0bff31


FALCON-614 Add pipeline element to process entity. Contributed by Balu Vellanki


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/69e0bff3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/69e0bff3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/69e0bff3

Branch: refs/heads/master
Commit: 69e0bff317172114e2f86d9b8808c55ec850abaa
Parents: 3fc1e8d
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Mon Aug 18 16:10:38 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Mon Aug 18 16:10:38 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 client/src/main/resources/process-0.1.xsd       | 14 +++++
 .../entity/parser/ProcessEntityParserTest.java  | 20 +++++++
 .../resources/config/process/process-0.1.xml    |  2 +
 .../config/process/process-bad-pipeline.xml     | 56 ++++++++++++++++++++
 docs/src/site/twiki/EntitySpecification.twiki   | 17 ++++++
 6 files changed, 112 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 085773b..b4dfdc4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-614 Add pipeline element to process entity
+   (Balu Vellanki via Venkatesh Seetharam)
+
    FALCON-588 Baselining designer code. (samar via Shwetha GS)
 
    FALCON-400 Add Authorization for Entities (Venkatesh Seetharam)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index cd4f5d2..06a2fe4 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -64,6 +64,15 @@
                     </xs:documentation>
                 </xs:annotation>
             </xs:element>
+            <xs:element type="PIPELINE_LIST" name="pipelines" minOccurs="0" maxOccurs="1">
+                <xs:annotation>
+                    <xs:documentation>
+                        pipelines: a process specifies an optional list of comma separated pipelines,
+                        separated by comma, which is used for classification of processes.
+                        Example: dataReplicationPipeline, clickStreamPipeline
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
             <xs:element type="clusters" name="clusters">
                 <xs:annotation>
                     <xs:documentation>Defines the clusters where the workflow should run
@@ -353,6 +362,11 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
+    <xs:simpleType name="PIPELINE_LIST">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([\w+_]+)([,]?[ ]*([\w+_]+))*"/>
+        </xs:restriction>
+    </xs:simpleType>
 
     <xs:complexType name="ACL">
         <xs:annotation>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 3513dab..80a9cc7 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -95,6 +95,10 @@ public class ProcessEntityParserTest extends AbstractTestBase {
         Assert.assertEquals(process.getFrequency().toString(), "hours(1)");
         Assert.assertEquals(process.getEntityType(), EntityType.PROCESS);
 
+        Assert.assertEquals(process.getTags(),
+                "consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting");
+        Assert.assertEquals(process.getPipelines(), "testPipeline,dataReplication_Pipeline");
+
         Assert.assertEquals(process.getInputs().getInputs().get(0).getName(), "impression");
         Assert.assertEquals(process.getInputs().getInputs().get(0).getFeed(), "impressionFeed");
         Assert.assertEquals(process.getInputs().getInputs().get(0).getStart(), "today(0,0)");
@@ -463,4 +467,20 @@ public class ProcessEntityParserTest extends AbstractTestBase {
             StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
         }
     }
+
+    /**
+     * A negative test for validating pipelines tag which is comma separated values.
+     * @throws FalconException
+     */
+    @Test
+    public void testPipelineTags() throws FalconException {
+        try {
+            InputStream stream = this.getClass().getResourceAsStream("/config/process/process-bad-pipeline.xml");
+
+            parser.parse(stream);
+            Assert.fail("org.xml.sax.SAXParseException should have been thrown.");
+        } catch (FalconException e) {
+            Assert.assertEquals(javax.xml.bind.UnmarshalException.class, e.getCause().getClass());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 6e27577..99a0376 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -17,6 +17,8 @@
   limitations under the License.
   -->
 <process name="sample" xmlns="uri:falcon:process:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
     <clusters>
         <cluster name="testCluster">
             <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/common/src/test/resources/config/process/process-bad-pipeline.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-bad-pipeline.xml b/common/src/test/resources/config/process/process-bad-pipeline.xml
new file mode 100644
index 0000000..e506bd9
--- /dev/null
+++ b/common/src/test/resources/config/process/process-bad-pipeline.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="sample" xmlns="uri:falcon:process:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <pipelines>etl-pipeline fetl-pipeline, ,</pipelines>
+    <clusters>
+        <cluster name="testCluster">
+            <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="impressionFeed" start="today(0,0)" end="today(2,0)" partition="*/US"/>
+        <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="impOutput" feed="imp-click-join1" instance="today(0,0)"/>
+        <output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="name1" value="value1"/>
+        <property name="name2" value="value2"/>
+    </properties>
+
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input input="impression" workflow-path="himpression/late/workflow"/>
+        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+    </late-process>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/69e0bff3/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 4eb4b17..df572cb 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -280,6 +280,23 @@ Syntax:
 </process>
 </verbatim>
 
+---+++ Tags
+An optional list of comma separated tags which are used for classification of processes.
+Syntax:
+<verbatim>
+...
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+</verbatim>
+
+---+++ Pipelines
+An optional list of comma separated word strings, specifies the data processing pipeline(s) to which this process belongs.
+Only letters, numbers and underscore are allowed for pipeline string.
+Syntax:
+<verbatim>
+...
+    <pipelines>test_Pipeline, dataReplication, clickStream_pipeline</pipelines>
+</verbatim>
+
 ---+++ Cluster
 The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster. 
 Syntax: