You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/01/14 17:46:30 UTC

git commit: FALCON-38 Falcon's parent workflow actions (pre-processing & post-processing) should have multiple retries. Contributed by shaik.idris

Updated Branches:
  refs/heads/master abd3fa48a -> 2bd7053b0


FALCON-38 Falcon's parent workflow actions (pre-processing & post-processing) should have multiple retries. Contributed by shaik.idris


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2bd7053b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2bd7053b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2bd7053b

Branch: refs/heads/master
Commit: 2bd7053b03f83a4c83383766728c3de76222ed89
Parents: abd3fa4
Author: shaikidris <ps...@gmail.com>
Authored: Tue Jan 14 22:12:28 2014 +0530
Committer: shaikidris <ps...@gmail.com>
Committed: Tue Jan 14 22:12:28 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 common/src/main/resources/runtime.properties    |  2 +
 .../falcon/converter/OozieFeedMapper.java       | 16 ++++++
 .../falcon/converter/OozieFeedMapperTest.java   | 22 +++++++-
 .../converter/AbstractOozieEntityMapper.java    | 13 +++++
 oozie/src/main/resources/oozie-workflow-0.3.xsd | 53 ++++++++++++++------
 .../falcon/converter/OozieProcessMapper.java    |  3 ++
 .../converter/OozieProcessMapperTest.java       |  6 +++
 8 files changed, 104 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6eac1ef..c5ceb7e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,10 +15,14 @@ Trunk (Unreleased)
     
     FALCON-66 Make oozie version change configurable. (Shwetha GS
     via Srikanth Sundarrajan)
+    
+    FALCON-38 Falcon's parent workflow actions (pre-processing & prost-processing) should have multiple retries. (Shaik Idris)
+
 
   OPTIMIZATIONS
 
   BUG FIXES
+
     FALCON-262 Example files should use aligned dependency versions. (Jean-Baptiste Onofré
     via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 26c916d..87b9d1e 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -23,3 +23,5 @@
 *.log.cleanup.frequency.days.retention =days(7)
 *.log.cleanup.frequency.months.retention =months(3)
 
+*.falcon.parentworkflow.retry.max=3
+*.falcon.parentworkflow.retry.interval.secs=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index fb066ab..6ca2134 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -198,6 +198,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                 WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
                 retWfApp.setName(wfName);
                 addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+                addOozieRetries(retWfApp);
                 marshal(cluster, retWfApp, wfPath);
             } catch(IOException e) {
                 throw new FalconException("Unable to create retention workflow", e);
@@ -220,10 +221,12 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                 WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
                 repWFapp.setName(wfName);
                 addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+                addOozieRetries(repWFapp);
                 marshal(cluster, repWFapp, wfPath);
             } catch(IOException e) {
                 throw new FalconException("Unable to create replication workflow", e);
             }
+
         }
 
         private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
@@ -543,4 +546,17 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             props.put(ARG.feedInstancePaths.getPropName(), instancePaths);
         }
     }
+
+    private void addOozieRetries(WORKFLOWAPP workflow) {
+        for (Object object : workflow.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index de7f9e5..128784e 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -203,6 +203,23 @@ public class OozieFeedMapperTest {
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
 
         assertLibExtensions(coord, "replication");
+        assertWorkflowRetries(coord);
+    }
+
+    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+            if (AbstractOozieEntityMapper.FALCON_ACTIONS.contains(actionName)) {
+                Assert.assertEquals(action.getRetryMax(), "3");
+                Assert.assertEquals(action.getRetryInterval(), "1");
+            }
+        }
     }
 
     private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
@@ -430,7 +447,7 @@ public class OozieFeedMapperTest {
     }
 
     @Test
-    public void testRetentionCoords() throws FalconException {
+    public void testRetentionCoords() throws FalconException, JAXBException, IOException {
         org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
         final Calendar instance = Calendar.getInstance();
         instance.roll(Calendar.YEAR, 1);
@@ -468,5 +485,8 @@ public class OozieFeedMapperTest {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+
+        assertWorkflowRetries(coord);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index ad095fd..cecdeef 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -39,6 +39,7 @@ import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.service.FalconPathFilter;
 import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -51,10 +52,14 @@ import org.apache.oozie.client.OozieClient;
 
 import javax.xml.bind.*;
 import java.io.*;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Entity mapper base class that allows an entity to be mapped to oozie bundle.
@@ -75,6 +80,8 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
     protected static final JAXBContext COORD_JAXB_CONTEXT;
     protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
+        "succeeded-post-processing", "failed-post-processing", "eviction", "jms-messaging", }));
 
     protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
         @Override
@@ -400,4 +407,10 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
             IOUtils.closeQuietly(out);
         }
     }
+
+    protected void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/oozie/src/main/resources/oozie-workflow-0.3.xsd
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/oozie-workflow-0.3.xsd b/oozie/src/main/resources/oozie-workflow-0.3.xsd
index 012d9f7..996a109 100644
--- a/oozie/src/main/resources/oozie-workflow-0.3.xsd
+++ b/oozie/src/main/resources/oozie-workflow-0.3.xsd
@@ -7,15 +7,15 @@
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-  -->
+-->
 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:0.3"
            elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:0.3">
 
@@ -29,6 +29,7 @@
 
     <xs:complexType name="WORKFLOW-APP">
         <xs:sequence>
+        	<xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/>
             <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/>
             <xs:choice minOccurs="0" maxOccurs="unbounded">
                 <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/>
@@ -40,7 +41,7 @@
             <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/>
             <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
         </xs:sequence>
-        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+        <xs:attribute name="name" type="xs:string" use="required"/>
     </xs:complexType>
 
     <xs:complexType name="START">
@@ -130,9 +131,12 @@
             </xs:choice>
             <xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
             <xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
-            <!--<xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>-->
+            <!--xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/-->
         </xs:sequence>
         <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+        <xs:attribute name="cred" type="xs:string"/>
+        <xs:attribute name="retry-max" type="xs:string"/>
+        <xs:attribute name="retry-interval" type="xs:string"/>
     </xs:complexType>
 
     <xs:complexType name="MAP-REDUCE">
@@ -165,7 +169,7 @@
             <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
         </xs:sequence>
     </xs:complexType>
-
+    
     <xs:complexType name="SSH">
         <xs:sequence>
             <xs:element name="host" type="xs:string" minOccurs="1" maxOccurs="1"/>
@@ -184,12 +188,12 @@
     </xs:complexType>
 
     <xs:complexType name="FS">
-        <xs:sequence>
-            <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
-            <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
-            <xs:element name="move" type="workflow:MOVE" minOccurs="0" maxOccurs="unbounded"/>
-            <xs:element name="chmod" type="workflow:CHMOD" minOccurs="0" maxOccurs="unbounded"/>
-        </xs:sequence>
+	<xs:choice minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="delete" type="workflow:DELETE"/>
+            <xs:element name="mkdir" type="workflow:MKDIR"/>
+            <xs:element name="move" type="workflow:MOVE"/>
+            <xs:element name="chmod" type="workflow:CHMOD"/>
+        </xs:choice>
     </xs:complexType>
 
     <xs:complexType name="JAVA">
@@ -270,5 +274,26 @@
         <xs:attribute name="permissions" type="xs:string" use="required"/>
         <xs:attribute name="dir-files" type="xs:string"/>
     </xs:complexType>
-
+    
+    <xs:complexType name="CREDENTIALS">             
+        <xs:sequence minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="credential" type="workflow:CREDENTIAL"/>
+		</xs:sequence>                
+    </xs:complexType>
+    
+   	<xs:complexType name="CREDENTIAL">
+        <xs:sequence  minOccurs="0" maxOccurs="unbounded" >
+                   <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+                	<xs:complexType>
+	                    <xs:sequence>
+	                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+	                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+	                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+	                    </xs:sequence>
+                	</xs:complexType>
+           		</xs:element>
+        </xs:sequence>
+        <xs:attribute name="name" type="xs:string" use="required"/>
+        <xs:attribute name="type" type="xs:string" use="required"/>
+    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index b72e243..87be709 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -519,6 +519,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                 decoratePIGAction(cluster, process, processWorkflow, action.getPig(), parentWfPath);
             } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
                 decorateHiveAction(cluster, process, processWorkflow, action, parentWfPath);
+            } else if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
             }
         }
 
@@ -812,4 +814,5 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             throw new RuntimeException("Unable to marshall hive action.", e);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2bd7053b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 224397e..794e585 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -434,6 +434,12 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
         Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
         Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
     }
 
     private COORDINATORAPP getCoordinator(Path path) throws Exception {