You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/06 11:53:20 UTC

[7/9] falcon git commit: FALCON-1719 Retry does not update the state of the instance in the database

FALCON-1719 Retry does not update the state of the instance in the database


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7cde36c4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7cde36c4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7cde36c4

Branch: refs/heads/0.9
Commit: 7cde36c4104bcb43b913cbd1c22288daa773f488
Parents: 5e80dcd
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Jan 6 15:13:59 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Jan 6 15:13:59 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                          |  2 ++
 .../falcon/rerun/handler/AbstractRerunConsumer.java  | 11 ++++++++---
 .../falcon/rerun/handler/AbstractRerunHandler.java   | 15 +++++++++++++--
 .../falcon/rerun/handler/LateRerunConsumer.java      |  9 +++++----
 .../falcon/rerun/handler/LateRerunHandler.java       |  5 ++---
 .../apache/falcon/rerun/handler/RetryConsumer.java   |  4 ++--
 6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 599efca..2ed1ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -90,6 +90,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 582cb15..f60b927 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -26,6 +26,7 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
 import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,9 +75,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
 
                 // Login the user to access WfEngine as this user
                 CurrentUser.authenticate(message.getWorkflowUser());
-                String jobStatus = handler.getWfEngine().getWorkflowStatus(
+                AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+                        message.getEntityName());
+                String jobStatus = wfEngine.getWorkflowStatus(
                         message.getClusterName(), message.getWfId());
-                handleRerun(message.getClusterName(), jobStatus, message);
+                handleRerun(message.getClusterName(), jobStatus, message,
+                        message.getEntityType(), message.getEntityName());
 
             } catch (Throwable e) {
                 LOG.error("Error in rerun consumer", e);
@@ -84,5 +88,6 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
         }
     }
 
-    protected abstract void handleRerun(String clusterName, String jobStatus, T message);
+    protected abstract void handleRerun(String clusterName, String jobStatus, T message,
+                                        String entityType, String entityName);
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 64c566e..bc1f7f2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -17,9 +17,11 @@
  */
 package org.apache.falcon.rerun.handler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Retry;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
@@ -58,8 +60,17 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
                                      String wfId, String workflowUser, long msgReceivedTime);
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    public AbstractWorkflowEngine getWfEngine() {
-        return wfEngine;
+    public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) {
+        if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
+            return wfEngine;
+        }
+        try {
+            Entity entity = EntityUtil.getEntity(EntityType.valueOf(entityType), entityName);
+            return WorkflowEngineFactory.getWorkflowEngine(entity);
+        } catch (FalconException e) {
+            // Just to make sure of backward compatibility in case of any exceptions.
+            return wfEngine;
+        }
     }
 
     public boolean offerToQueue(T event) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index ee31952..4297788 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -53,7 +53,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
     @Override
     protected void handleRerun(String clusterName, String jobStatus,
-                               LaterunEvent message) {
+                               LaterunEvent message, String entityType, String entityName) {
         try {
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
                     || jobStatus.equals("SUSPENDED")) {
@@ -77,7 +77,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
             LOG.info("Late changes detected in the following feeds: {}", detectLate);
 
-            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true);
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, true);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {
@@ -91,8 +91,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
     public String detectLate(LaterunEvent message) throws Exception {
         LateDataHandler late = new LateDataHandler();
-        Properties properties = handler.getWfEngine().getWorkflowProperties(
-                message.getClusterName(), message.getWfId());
+        AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
+                message.getEntityName());
+        Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
         String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
         String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
         String falconInputFeedStorageTypes =

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 64177a4..1d2ed37 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -65,9 +65,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             Long wait = getEventDelay(entity, nominalTime);
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
-
-                java.util.Properties properties =
-                        this.getWfEngine().getWorkflowProperties(cluster, wfId);
+                AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
+                java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");
                 String srcClusterName = properties.getProperty("srcClusterName");
                 Path lateLogPath = this.getLateLogPath(logDir,

http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 61aa3e1..96300d9 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -39,7 +39,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
 
     @Override
     protected void handleRerun(String clusterName, String jobStatus,
-                               RetryEvent message) {
+                               RetryEvent message, String entityType, String entityName) {
         try {
             if (!jobStatus.equals("KILLED")) {
                 LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:"
@@ -52,7 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
                     + " At time: {}",
                     (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(),
                     message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis())));
-            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false);
+            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false);
         } catch (Exception e) {
             int maxFailRetryCount = Integer.parseInt(StartupProperties.get()
                     .getProperty("max.retry.failure.count", "1"));