You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/02/19 13:37:57 UTC

git commit: FALCON-242 Post processing is not called in Retention workflows. Contributed by Shaik Idris

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 3c51f1053 -> d5b64722c


FALCON-242 Post processing is not called in Retention workflows. Contributed by Shaik Idris


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d5b64722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d5b64722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d5b64722

Branch: refs/heads/master
Commit: d5b64722c451b1cb09dcb11530c2723a33494579
Parents: 3c51f10
Author: shaikidris <ps...@gmail.com>
Authored: Wed Feb 19 18:07:12 2014 +0530
Committer: shaikidris <ps...@gmail.com>
Committed: Wed Feb 19 18:07:12 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../config/workflow/replication-workflow.xml    |   4 +-
 .../config/workflow/retention-workflow.xml      | 116 +++++++++++++++----
 .../falcon/messaging/EntityInstanceMessage.java |   6 +
 .../converter/AbstractOozieEntityMapper.java    |   2 +-
 .../org/apache/falcon/logging/LogMover.java     |   9 +-
 .../falcon/service/FalconTopicSubscriber.java   |  15 ++-
 .../config/workflow/process-parent-workflow.xml |   2 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |  13 ---
 .../apache/falcon/retention/FeedEvictor.java    |   3 +-
 10 files changed, 125 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecab28a..26b76b0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@ Trunk (Unreleased)
     FALCON-38 Falcon's parent workflow actions (pre-processing & prost-processing) 
     should have multiple retries. (Shaik Idris)
 
+    FALCON-242 Post processing is not called in Retention workflows (Shaik Idris)
 
   OPTIMIZATIONS
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 7a95c35..3ee6f86 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -60,7 +60,7 @@
             <capture-output/>
         </java>
         <ok to="replication-decision"/>
-        <error to="fail"/>
+        <error to="failed-post-processing"/>
     </action>
     <decision name="replication-decision">
         <switch>
@@ -96,7 +96,7 @@
             <param>falconSourceStagingDir=${distcpSourcePaths}</param>
         </hive>
         <ok to="replication"/>
-        <error to="fail"/>
+        <error to="failed-post-processing"/>
     </action>
     <!-- Replication action -->
     <action name="replication">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 08795b4..87fdc5a 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -56,11 +56,11 @@
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
         </java>
-        <ok to="jms-messaging"/>
-        <error to="fail"/>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
     </action>
 
-    <action name='jms-messaging'>
+    <action name='succeeded-post-processing'>
         <java>
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
@@ -74,39 +74,114 @@
                     <value>${jobPriority}</value>
                 </property>
             </configuration>
-            <main-class>org.apache.falcon.messaging.MessageProducer</main-class>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
             <arg>-entityName</arg>
             <arg>${entityName}</arg>
-            <arg>-feedNames</arg>
-            <arg>${feedNames}</arg>
-            <arg>-feedInstancePaths</arg>
-            <arg>${feedInstancePaths}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>DELETE</arg>
             <arg>-workflowId</arg>
             <arg>${wf:id()}</arg>
             <arg>-runId</arg>
             <arg>${wf:run()}</arg>
-            <arg>-nominalTime</arg>
-            <arg>${nominalTime}</arg>
+            <arg>-status</arg>
+            <arg>SUCCEEDED</arg>
             <arg>-timeStamp</arg>
             <arg>${timeStamp}</arg>
-            <arg>-brokerUrl</arg>
-            <arg>${userBrokerUrl}</arg>
             <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
             <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <action name='failed-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
             <arg>-entityType</arg>
             <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
             <arg>-operation</arg>
             <arg>DELETE</arg>
-            <arg>-logFile</arg>
-            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
-            <arg>-topicName</arg>
-            <arg>FALCON.${entityName}</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
             <arg>-status</arg>
-            <arg>SUCCEEDED</arg>
+            <arg>FAILED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
             <arg>-brokerTTL</arg>
             <arg>${wf:conf("broker.ttlInMins")}</arg>
-            <arg>-cluster</arg>
-            <arg>${cluster}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
             <arg>-workflowUser</arg>
             <arg>${wf:user()}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
@@ -116,10 +191,9 @@
             <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
             <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
         </java>
-        <ok to="end"/>
+        <ok to="fail"/>
         <error to="fail"/>
     </action>
-
     <kill name="fail">
         <message>
             Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index d8ba4f3..b6ff504 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -208,6 +208,12 @@ public class EntityInstanceMessage {
         //else case of feed retention
         Path logFile = new Path(cmd.getOptionValue(ARG.logFile.getArgName()));
         FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+
+        if (!fs.exists(logFile)) {
+            //Evictor Failed without deleting a single path
+            return new String[0];
+        }
+
         ByteArrayOutputStream writer = new ByteArrayOutputStream();
         InputStream instance = fs.open(logFile);
         IOUtils.copyBytes(instance, writer, 4096, true);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index 0762514..f443939 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -85,7 +85,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
     protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
     public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
-        "succeeded-post-processing", "failed-post-processing", "eviction", "jms-messaging", }));
+        "succeeded-post-processing", "failed-post-processing", }));
 
     protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index 92b90e7..117aa58 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -43,7 +43,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URL;
 import java.net.URLConnection;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utility called in the post process of oozie workflow to move oozie action executor log.
@@ -51,6 +54,8 @@ import java.util.List;
 public class LogMover extends Configured implements Tool {
 
     private static final Logger LOG = Logger.getLogger(LogMover.class);
+    public static final Set<String> FALCON_ACTIONS =
+        new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
 
     /**
      * Args to the command.
@@ -89,12 +94,12 @@ public class LogMover extends Configured implements Tool {
 
             if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
                     || notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
-                // if replication wf or PIG Process
+                // if replication wf, retention wf or PIG Process
                 copyOozieLog(client, fs, path, jobInfo.getId());
 
                 List<WorkflowAction> workflowActions = jobInfo.getActions();
                 for (int i=0; i < workflowActions.size(); i++) {
-                    if (workflowActions.get(i).getName().equals("replication")) {
+                    if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
                         copyTTlogs(fs, path, jobInfo.getActions().get(i));
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index 7e2a6c1..c1d629d 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.falcon.service;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.rerun.event.RerunEvent.RerunType;
@@ -110,13 +111,19 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
                         SchemaHelper.formatDateUTC(startTime), "", "", duration);
 
             } else if (status.equalsIgnoreCase("SUCCEEDED")) {
-                latedataHandler.handleRerun(cluster, entityType, entityName,
+                Entity entity = EntityUtil.getEntity(entityType, entityName);
+                //late data handling not applicable for feed retention action
+                if (!operation.equalsIgnoreCase("DELETE") && EntityUtil.getLateProcess(entity) != null) {
+                    latedataHandler.handleRerun(cluster, entityType, entityName,
                         nominalTime, runId, workflowId, workflowUser,
                         System.currentTimeMillis());
-
+                } else {
+                    LOG.info("Late date handling not applicable for entityType: " + entityType + ", entityName: "
+                        + entityName + " operation: " + operation);
+                }
                 GenericAlert.instrumentSucceededInstance(cluster, entityType,
-                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime), duration);
+                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                    SchemaHelper.formatDateUTC(startTime), duration);
 
                 notifySLAService(cluster, entityName, entityType, nominalTime, duration);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index f53c1e7..838215f 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -60,7 +60,7 @@
             <capture-output/>
         </java>
         <ok to="user-workflow"/>
-        <error to="fail"/>
+        <error to="failed-post-processing"/>
     </action>
 
     <decision name='user-workflow'>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 72f93cb..f1de2cb 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -55,19 +55,6 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                             String runId, String wfId, String workflowUser, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
-            try {
-                if (EntityUtil.getLateProcess(entity) == null
-                        || EntityUtil.getLateProcess(entity).getLateInputs() == null
-                        || EntityUtil.getLateProcess(entity).getLateInputs()
-                        .size() == 0) {
-                    LOG.info("Late rerun not configured for entity: " + entityName);
-                    return;
-                }
-            } catch (FalconException e) {
-                LOG.error("Unable to get Late Process for entity:" + entityName);
-                return;
-            }
-
             int intRunId = Integer.parseInt(runId);
             Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
             Long wait = getEventDelay(entity, nominalTime);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index a8db52e..13c447c 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -462,8 +462,7 @@ public class FeedEvictor extends Configured implements Tool {
         if (feedBasePath.equals(parent)) {
             LOG.info("Not deleting feed base path:" + parent);
         } else {
-            FileStatus[] files = fs.listStatus(parent);
-            if (files != null && files.length == 0) {
+            if (fs.getContentSummary(parent).getFileCount() == 0) {
                 LOG.info("Parent path: " + parent + " is empty, deleting path");
                 if (fs.delete(parent, true)) {
                     LOG.info("Deleted empty dir: " + parent);