You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2018/02/26 05:40:44 UTC

falcon git commit: FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled

Repository: falcon
Updated Branches:
  refs/heads/master dc470882f -> d7a4eeb4e


FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled

Author: sandeep <sa...@gmail.com>
Author: sandeep.samudrala <sa...@inmobi.com>
Author: Sandeep Samudrala <sa...@gmail.com>

Reviewers: @pallavi-rao

Closes #399 from sandeepSamudrala/master and squashes the following commits:

974dbbe80 [Sandeep Samudrala] FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled
678df54d1 [Sandeep Samudrala] Merge branch 'master' of https://github.com/apache/falcon
9c907efdb [sandeep.samudrala] FALCON-2319. Falcon Build failure fix for enunciate
b5fb5786c [sandeep.samudrala] git applyMerge branch 'master' of https://github.com/apache/falcon
575e76866 [sandeep.samudrala] Merge branch 'master' of https://github.com/apache/falcon
e0ad35884 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084f6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e93f [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081ff [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afaca [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc4609 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e98d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a17880526 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bfaa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c72 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c06556623 [sandeep] reverting last line changes made
1a4dcd234 [sandeep] rebased and resolved the conflicts from master
271318b9c [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe07 [sandeep] rebasing from master
9e68a5783 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d7a4eeb4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d7a4eeb4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d7a4eeb4

Branch: refs/heads/master
Commit: d7a4eeb4ea19f731a984f40a80cb081995e57521
Parents: dc47088
Author: sandeep <sa...@gmail.com>
Authored: Mon Feb 26 11:10:38 2018 +0530
Committer: pallavi-rao <pa...@inmobi.com>
Committed: Mon Feb 26 11:10:38 2018 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/security/CurrentUser.java |   8 +-
 .../apache/falcon/security/HostnameFilter.java  | 105 +++++++++++++++++++
 .../org/apache/falcon/logging/JobLogMover.java  |  10 +-
 .../workflow/engine/OozieClientFactory.java     |   2 +-
 .../apache/oozie/client/ProxyOozieClient.java   |  20 ++++
 .../apache/falcon/security/HostnameFilter.java  | 105 -------------------
 .../rerun/handler/AbstractRerunConsumer.java    |   2 +-
 .../rerun/handler/AbstractRerunHandler.java     |   9 +-
 .../falcon/rerun/handler/LateRerunConsumer.java |   5 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   2 +-
 .../falcon/rerun/handler/RetryConsumer.java     |   4 +-
 11 files changed, 148 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index e7c1594..63e599a 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -72,10 +72,8 @@ public final class CurrentUser {
      *
      * @param doAsUser doAs user
      * @param proxyHost proxy host
-     * @throws IOException
      */
-    public static void proxyDoAsUser(final String doAsUser,
-                                     final String proxyHost) throws IOException {
+    public static void proxyDoAsUser(final String doAsUser, final String proxyHost) {
         if (!isAuthenticated()) {
             throw new IllegalStateException("Authentication not done");
         }
@@ -106,10 +104,8 @@ public final class CurrentUser {
      *
      * @param aclOwner entity acl owner
      * @param aclGroup entity acl group
-     * @throws IOException
      */
-    public static void proxy(final String aclOwner,
-                             final String aclGroup) throws IOException {
+    public static void proxy(final String aclOwner, final String aclGroup) {
         if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) {
             throw new IllegalStateException("Authentication not done or Bad user name");
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/HostnameFilter.java b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
new file mode 100644
index 0000000..19e7bf4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter that resolves the requester hostname.
+ */
+public class HostnameFilter implements Filter {
+    private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class);
+
+    static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>();
+
+    /**
+     * Initializes the filter.
+     *
+     * @param config filter configuration.
+     *
+     * @throws javax.servlet.ServletException thrown if the filter could not be initialized.
+     */
+    @Override
+    public void init(FilterConfig config) throws ServletException {
+    }
+
+    /**
+     * Resolves the requester hostname and delegates the request to the chain.
+     * <p>
+     * The requester hostname is available via the {@link #get} method.
+     *
+     * @param request servlet request.
+     * @param response servlet response.
+     * @param chain filter chain.
+     *
+     * @throws java.io.IOException thrown if an IO error occurs.
+     * @throws ServletException thrown if a servlet error occurs.
+     */
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+        throws IOException, ServletException {
+        try {
+            String hostname;
+            try {
+                String address = request.getRemoteAddr();
+                if (address != null) {
+                    hostname = InetAddress.getByName(address).getCanonicalHostName();
+                } else {
+                    LOG.warn("Request remote address is NULL");
+                    hostname = "???";
+                }
+            } catch (UnknownHostException ex) {
+                LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex);
+                hostname = "???";
+            }
+            HOSTNAME_TL.set(hostname);
+            chain.doFilter(request, response);
+        } finally {
+            HOSTNAME_TL.remove();
+        }
+    }
+
+    /**
+     * Returns the requester hostname.
+     *
+     * @return the requester hostname.
+     */
+    public static String get() {
+        return HOSTNAME_TL.get();
+    }
+
+    /**
+     * Destroys the filter.
+     * <p>
+     * This implementation is a NOP.
+     */
+    @Override
+    public void destroy() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 5023db3..e80967f 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -23,13 +23,14 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.HostnameFilter;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.engine.OozieClientFactory;
 import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
@@ -72,10 +73,6 @@ public class JobLogMover {
     }
 
     public void moveLog(WorkflowExecutionContext context){
-        if (UserGroupInformation.isSecurityEnabled()) {
-            LOG.info("Unable to move logs as security is enabled.");
-            return;
-        }
         try {
             run(context);
         } catch (Exception ignored) {
@@ -95,10 +92,11 @@ public class JobLogMover {
             String instanceOwner = context.getWorkflowUser();
             if (StringUtils.isNotBlank(instanceOwner)) {
                 CurrentUser.authenticate(instanceOwner);
+                CurrentUser.proxyDoAsUser(instanceOwner, HostnameFilter.get());
             } else {
                 CurrentUser.authenticate(System.getProperty("user.name"));
             }
-            OozieClient client = new OozieClient(engineUrl);
+            OozieClient client = OozieClientFactory.getClientRef(engineUrl);
             WorkflowJob jobInfo;
             try {
                 jobInfo = client.getJobInfo(context.getWorkflowId());

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index 3380b1a..278b0c4 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -55,7 +55,7 @@ public final class OozieClientFactory {
         return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
     }
 
-    private static OozieClient getClientRef(String oozieUrl)
+    public static OozieClient getClientRef(String oozieUrl)
         throws FalconException {
 
         if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
index eaf3ede..fca6137 100644
--- a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
@@ -489,6 +489,26 @@ public class ProxyOozieClient extends AuthOozieClient {
     }
 
     @Override
+    public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, final String scope,
+                                              final boolean refresh, final boolean noCleanup,
+                                              final boolean failed, final Properties props)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorAction>>() {
+
+                public List<CoordinatorAction> call() throws Exception {
+                    return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed,
+                            props);
+                }
+            });
+        } catch (OozieClientException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new OozieClientException(e.toString(), e);
+        }
+    }
+
+    @Override
     public Void reRunBundle(final String jobId, final String coordScope, final String dateScope,
                             final boolean refresh, final boolean noCleanup)
         throws OozieClientException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java b/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
deleted file mode 100644
index 19e7bf4..0000000
--- a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Filter that resolves the requester hostname.
- */
-public class HostnameFilter implements Filter {
-    private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class);
-
-    static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>();
-
-    /**
-     * Initializes the filter.
-     *
-     * @param config filter configuration.
-     *
-     * @throws javax.servlet.ServletException thrown if the filter could not be initialized.
-     */
-    @Override
-    public void init(FilterConfig config) throws ServletException {
-    }
-
-    /**
-     * Resolves the requester hostname and delegates the request to the chain.
-     * <p>
-     * The requester hostname is available via the {@link #get} method.
-     *
-     * @param request servlet request.
-     * @param response servlet response.
-     * @param chain filter chain.
-     *
-     * @throws java.io.IOException thrown if an IO error occurs.
-     * @throws ServletException thrown if a servlet error occurs.
-     */
-    @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
-        throws IOException, ServletException {
-        try {
-            String hostname;
-            try {
-                String address = request.getRemoteAddr();
-                if (address != null) {
-                    hostname = InetAddress.getByName(address).getCanonicalHostName();
-                } else {
-                    LOG.warn("Request remote address is NULL");
-                    hostname = "???";
-                }
-            } catch (UnknownHostException ex) {
-                LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex);
-                hostname = "???";
-            }
-            HOSTNAME_TL.set(hostname);
-            chain.doFilter(request, response);
-        } finally {
-            HOSTNAME_TL.remove();
-        }
-    }
-
-    /**
-     * Returns the requester hostname.
-     *
-     * @return the requester hostname.
-     */
-    public static String get() {
-        return HOSTNAME_TL.get();
-    }
-
-    /**
-     * Destroys the filter.
-     * <p>
-     * This implementation is a NOP.
-     */
-    @Override
-    public void destroy() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 000fd55..8a75754 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -77,7 +77,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
                 // Login the user to access WfEngine as this user
                 CurrentUser.authenticate(message.getWorkflowUser());
                 AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
-                        message.getEntityName());
+                        message.getEntityName(), message.getWorkflowUser());
                 String jobStatus = wfEngine.getWorkflowStatus(
                         message.getClusterName(), message.getWfId());
                 handleRerun(message.getClusterName(), jobStatus, message,

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 700095e..bfeb6c3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -25,6 +25,8 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Retry;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.HostnameFilter;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.WorkflowExecutionListener;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -60,7 +62,12 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
                                      String wfId, String parentId, String workflowUser, long msgReceivedTime);
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) throws FalconException {
+    public AbstractWorkflowEngine getWfEngine(String entityType, String entityName, String doAsUser)
+        throws FalconException {
+        if (StringUtils.isNotBlank(doAsUser)) {
+            CurrentUser.authenticate(doAsUser);
+            CurrentUser.proxyDoAsUser(doAsUser, HostnameFilter.get());
+        }
         if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
             return wfEngine;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 98db379..1ba5a65 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -85,7 +85,8 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             if (StringUtils.isBlank(id)) {
                 id = message.getWfId();
             }
-            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, true);
+            handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
+                    .reRun(message.getClusterName(), id, null, true);
             LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
                     message.getWfId(), message.getClusterName());
         } catch (Exception e) {
@@ -106,7 +107,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
     public String detectLate(LaterunEvent message) throws Exception {
         LateDataHandler late = new LateDataHandler();
         AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
-                message.getEntityName());
+                message.getEntityName(), message.getWorkflowUser());
         Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
         String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
         String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 02ab792..0be0b25 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -66,7 +66,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             Long wait = getEventDelay(entity, nominalTime);
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
-                AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
+                AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName, entity.getACL().getOwner());
                 java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
                 String logDir = properties.getProperty("logDir");
                 String srcClusterName = properties.getProperty("srcClusterName");

http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 3cad362..7ee2337 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -63,7 +63,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
             if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) {
                 id = message.getParentId();
             }
-            handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
+
+            handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
+                    .reRun(message.getClusterName(), id, null, false);
         } catch (Exception e) {
             if (e instanceof EntityNotRegisteredException) {
                 LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "