You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/02/19 13:37:57 UTC
git commit: FALCON-242 Post processing is not called in Retention
workflows. Contributed by Shaik Idris
Repository: incubator-falcon
Updated Branches:
refs/heads/master 3c51f1053 -> d5b64722c
FALCON-242 Post processing is not called in Retention workflows. Contributed by Shaik Idris
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d5b64722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d5b64722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d5b64722
Branch: refs/heads/master
Commit: d5b64722c451b1cb09dcb11530c2723a33494579
Parents: 3c51f10
Author: shaikidris <ps...@gmail.com>
Authored: Wed Feb 19 18:07:12 2014 +0530
Committer: shaikidris <ps...@gmail.com>
Committed: Wed Feb 19 18:07:12 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../config/workflow/replication-workflow.xml | 4 +-
.../config/workflow/retention-workflow.xml | 116 +++++++++++++++----
.../falcon/messaging/EntityInstanceMessage.java | 6 +
.../converter/AbstractOozieEntityMapper.java | 2 +-
.../org/apache/falcon/logging/LogMover.java | 9 +-
.../falcon/service/FalconTopicSubscriber.java | 15 ++-
.../config/workflow/process-parent-workflow.xml | 2 +-
.../falcon/rerun/handler/LateRerunHandler.java | 13 ---
.../apache/falcon/retention/FeedEvictor.java | 3 +-
10 files changed, 125 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecab28a..26b76b0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@ Trunk (Unreleased)
FALCON-38 Falcon's parent workflow actions (pre-processing & prost-processing)
should have multiple retries. (Shaik Idris)
+ FALCON-242 Post processing is not called in Retention workflows (Shaik Idris)
OPTIMIZATIONS
FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 7a95c35..3ee6f86 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -60,7 +60,7 @@
<capture-output/>
</java>
<ok to="replication-decision"/>
- <error to="fail"/>
+ <error to="failed-post-processing"/>
</action>
<decision name="replication-decision">
<switch>
@@ -96,7 +96,7 @@
<param>falconSourceStagingDir=${distcpSourcePaths}</param>
</hive>
<ok to="replication"/>
- <error to="fail"/>
+ <error to="failed-post-processing"/>
</action>
<!-- Replication action -->
<action name="replication">
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 08795b4..87fdc5a 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -56,11 +56,11 @@
<arg>-logFile</arg>
<arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
</java>
- <ok to="jms-messaging"/>
- <error to="fail"/>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
</action>
- <action name='jms-messaging'>
+ <action name='succeeded-post-processing'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
@@ -74,39 +74,114 @@
<value>${jobPriority}</value>
</property>
</configuration>
- <main-class>org.apache.falcon.messaging.MessageProducer</main-class>
+ <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+ <arg>-cluster</arg>
+ <arg>${cluster}</arg>
+ <arg>-entityType</arg>
+ <arg>${entityType}</arg>
<arg>-entityName</arg>
<arg>${entityName}</arg>
- <arg>-feedNames</arg>
- <arg>${feedNames}</arg>
- <arg>-feedInstancePaths</arg>
- <arg>${feedInstancePaths}</arg>
+ <arg>-nominalTime</arg>
+ <arg>${nominalTime}</arg>
+ <arg>-operation</arg>
+ <arg>DELETE</arg>
<arg>-workflowId</arg>
<arg>${wf:id()}</arg>
<arg>-runId</arg>
<arg>${wf:run()}</arg>
- <arg>-nominalTime</arg>
- <arg>${nominalTime}</arg>
+ <arg>-status</arg>
+ <arg>SUCCEEDED</arg>
<arg>-timeStamp</arg>
<arg>${timeStamp}</arg>
- <arg>-brokerUrl</arg>
- <arg>${userBrokerUrl}</arg>
<arg>-brokerImplClass</arg>
+ <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>-brokerUrl</arg>
+ <arg>${wf:conf("broker.url")}</arg>
+ <arg>-userBrokerImplClass</arg>
<arg>${userBrokerImplClass}</arg>
+ <arg>-userBrokerUrl</arg>
+ <arg>${userBrokerUrl}</arg>
+ <arg>-brokerTTL</arg>
+ <arg>${wf:conf("broker.ttlInMins")}</arg>
+ <arg>-feedNames</arg>
+ <arg>${feedNames}</arg>
+ <arg>-feedInstancePaths</arg>
+ <arg>${feedInstancePaths}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ <arg>-workflowEngineUrl</arg>
+ <arg>${workflowEngineUrl}</arg>
+ <arg>-subflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-logDir</arg>
+ <arg>${logDir}/job-${nominalTime}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
+ <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+ <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+ <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+ <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+ <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+ <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+ </java>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <action name='failed-post-processing'>
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+ <arg>-cluster</arg>
+ <arg>${cluster}</arg>
<arg>-entityType</arg>
<arg>${entityType}</arg>
+ <arg>-entityName</arg>
+ <arg>${entityName}</arg>
+ <arg>-nominalTime</arg>
+ <arg>${nominalTime}</arg>
<arg>-operation</arg>
<arg>DELETE</arg>
- <arg>-logFile</arg>
- <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
- <arg>-topicName</arg>
- <arg>FALCON.${entityName}</arg>
+ <arg>-workflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-runId</arg>
+ <arg>${wf:run()}</arg>
<arg>-status</arg>
- <arg>SUCCEEDED</arg>
+ <arg>FAILED</arg>
+ <arg>-timeStamp</arg>
+ <arg>${timeStamp}</arg>
+ <arg>-brokerImplClass</arg>
+ <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>-brokerUrl</arg>
+ <arg>${wf:conf("broker.url")}</arg>
+ <arg>-userBrokerImplClass</arg>
+ <arg>${userBrokerImplClass}</arg>
+ <arg>-userBrokerUrl</arg>
+ <arg>${userBrokerUrl}</arg>
<arg>-brokerTTL</arg>
<arg>${wf:conf("broker.ttlInMins")}</arg>
- <arg>-cluster</arg>
- <arg>${cluster}</arg>
+ <arg>-feedNames</arg>
+ <arg>${feedNames}</arg>
+ <arg>-feedInstancePaths</arg>
+ <arg>${feedInstancePaths}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ <arg>-workflowEngineUrl</arg>
+ <arg>${workflowEngineUrl}</arg>
+ <arg>-subflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-logDir</arg>
+ <arg>${logDir}/job-${nominalTime}/</arg>
<arg>-workflowUser</arg>
<arg>${wf:user()}</arg>
<file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
@@ -116,10 +191,9 @@
<file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
<file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
</java>
- <ok to="end"/>
+ <ok to="fail"/>
<error to="fail"/>
</action>
-
<kill name="fail">
<message>
Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index d8ba4f3..b6ff504 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -208,6 +208,12 @@ public class EntityInstanceMessage {
//else case of feed retention
Path logFile = new Path(cmd.getOptionValue(ARG.logFile.getArgName()));
FileSystem fs = FileSystem.get(logFile.toUri(), new Configuration());
+
+ if (!fs.exists(logFile)) {
+ //Evictor Failed without deleting a single path
+ return new String[0];
+ }
+
ByteArrayOutputStream writer = new ByteArrayOutputStream();
InputStream instance = fs.open(logFile);
IOUtils.copyBytes(instance, writer, 4096, true);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index 0762514..f443939 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -85,7 +85,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
- "succeeded-post-processing", "failed-post-processing", "eviction", "jms-messaging", }));
+ "succeeded-post-processing", "failed-post-processing", }));
protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index 92b90e7..117aa58 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -43,7 +43,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Utility called in the post process of oozie workflow to move oozie action executor log.
@@ -51,6 +54,8 @@ import java.util.List;
public class LogMover extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(LogMover.class);
+ public static final Set<String> FALCON_ACTIONS =
+ new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
/**
* Args to the command.
@@ -89,12 +94,12 @@ public class LogMover extends Configured implements Tool {
if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
|| notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
- // if replication wf or PIG Process
+ // if replication wf, retention wf or PIG Process
copyOozieLog(client, fs, path, jobInfo.getId());
List<WorkflowAction> workflowActions = jobInfo.getActions();
for (int i=0; i < workflowActions.size(); i++) {
- if (workflowActions.get(i).getName().equals("replication")) {
+ if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
copyTTlogs(fs, path, jobInfo.getActions().get(i));
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index 7e2a6c1..c1d629d 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -20,6 +20,7 @@ package org.apache.falcon.service;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
import org.apache.falcon.rerun.event.RerunEvent.RerunType;
@@ -110,13 +111,19 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
SchemaHelper.formatDateUTC(startTime), "", "", duration);
} else if (status.equalsIgnoreCase("SUCCEEDED")) {
- latedataHandler.handleRerun(cluster, entityType, entityName,
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+ //late data handling not applicable for feed retention action
+ if (!operation.equalsIgnoreCase("DELETE") && EntityUtil.getLateProcess(entity) != null) {
+ latedataHandler.handleRerun(cluster, entityType, entityName,
nominalTime, runId, workflowId, workflowUser,
System.currentTimeMillis());
-
+ } else {
+ LOG.info("Late date handling not applicable for entityType: " + entityType + ", entityName: "
+ + entityName + " operation: " + operation);
+ }
GenericAlert.instrumentSucceededInstance(cluster, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), duration);
+ entityName, nominalTime, workflowId, workflowUser, runId, operation,
+ SchemaHelper.formatDateUTC(startTime), duration);
notifySLAService(cluster, entityName, entityType, nominalTime, duration);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index f53c1e7..838215f 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -60,7 +60,7 @@
<capture-output/>
</java>
<ok to="user-workflow"/>
- <error to="fail"/>
+ <error to="failed-post-processing"/>
</action>
<decision name='user-workflow'>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 72f93cb..f1de2cb 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -55,19 +55,6 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
String runId, String wfId, String workflowUser, long msgReceivedTime) {
try {
Entity entity = EntityUtil.getEntity(entityType, entityName);
- try {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs()
- .size() == 0) {
- LOG.info("Late rerun not configured for entity: " + entityName);
- return;
- }
- } catch (FalconException e) {
- LOG.error("Unable to get Late Process for entity:" + entityName);
- return;
- }
-
int intRunId = Integer.parseInt(runId);
Date msgInsertTime = EntityUtil.parseDateUTC(nominalTime);
Long wait = getEventDelay(entity, nominalTime);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d5b64722/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index a8db52e..13c447c 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -462,8 +462,7 @@ public class FeedEvictor extends Configured implements Tool {
if (feedBasePath.equals(parent)) {
LOG.info("Not deleting feed base path:" + parent);
} else {
- FileStatus[] files = fs.listStatus(parent);
- if (files != null && files.length == 0) {
+ if (fs.getContentSummary(parent).getFileCount() == 0) {
LOG.info("Parent path: " + parent + " is empty, deleting path");
if (fs.delete(parent, true)) {
LOG.info("Deleted empty dir: " + parent);