You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2018/02/26 05:40:44 UTC
falcon git commit: FALCON-2324 Retry,
Latererun and JobLog mover services not working with kerberos enabled
Repository: falcon
Updated Branches:
refs/heads/master dc470882f -> d7a4eeb4e
FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled
Author: sandeep <sa...@gmail.com>
Author: sandeep.samudrala <sa...@inmobi.com>
Author: Sandeep Samudrala <sa...@gmail.com>
Reviewers: @pallavi-rao
Closes #399 from sandeepSamudrala/master and squashes the following commits:
974dbbe80 [Sandeep Samudrala] FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled
678df54d1 [Sandeep Samudrala] Merge branch 'master' of https://github.com/apache/falcon
9c907efdb [sandeep.samudrala] FALCON-2319. Falcon Build failure fix for enunciate
b5fb5786c [sandeep.samudrala] git applyMerge branch 'master' of https://github.com/apache/falcon
575e76866 [sandeep.samudrala] Merge branch 'master' of https://github.com/apache/falcon
e0ad35884 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084f6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e93f [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081ff [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afaca [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc4609 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e98d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a17880526 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bfaa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c72 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c06556623 [sandeep] reverting last line changes made
1a4dcd234 [sandeep] rebased and resolved the conflicts from master
271318b9c [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe07 [sandeep] rebasing from master
9e68a5783 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d7a4eeb4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d7a4eeb4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d7a4eeb4
Branch: refs/heads/master
Commit: d7a4eeb4ea19f731a984f40a80cb081995e57521
Parents: dc47088
Author: sandeep <sa...@gmail.com>
Authored: Mon Feb 26 11:10:38 2018 +0530
Committer: pallavi-rao <pa...@inmobi.com>
Committed: Mon Feb 26 11:10:38 2018 +0530
----------------------------------------------------------------------
.../org/apache/falcon/security/CurrentUser.java | 8 +-
.../apache/falcon/security/HostnameFilter.java | 105 +++++++++++++++++++
.../org/apache/falcon/logging/JobLogMover.java | 10 +-
.../workflow/engine/OozieClientFactory.java | 2 +-
.../apache/oozie/client/ProxyOozieClient.java | 20 ++++
.../apache/falcon/security/HostnameFilter.java | 105 -------------------
.../rerun/handler/AbstractRerunConsumer.java | 2 +-
.../rerun/handler/AbstractRerunHandler.java | 9 +-
.../falcon/rerun/handler/LateRerunConsumer.java | 5 +-
.../falcon/rerun/handler/LateRerunHandler.java | 2 +-
.../falcon/rerun/handler/RetryConsumer.java | 4 +-
11 files changed, 148 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index e7c1594..63e599a 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -72,10 +72,8 @@ public final class CurrentUser {
*
* @param doAsUser doAs user
* @param proxyHost proxy host
- * @throws IOException
*/
- public static void proxyDoAsUser(final String doAsUser,
- final String proxyHost) throws IOException {
+ public static void proxyDoAsUser(final String doAsUser, final String proxyHost) {
if (!isAuthenticated()) {
throw new IllegalStateException("Authentication not done");
}
@@ -106,10 +104,8 @@ public final class CurrentUser {
*
* @param aclOwner entity acl owner
* @param aclGroup entity acl group
- * @throws IOException
*/
- public static void proxy(final String aclOwner,
- final String aclGroup) throws IOException {
+ public static void proxy(final String aclOwner, final String aclGroup) {
if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) {
throw new IllegalStateException("Authentication not done or Bad user name");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/HostnameFilter.java b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
new file mode 100644
index 0000000..19e7bf4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/security/HostnameFilter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter that resolves the requester hostname.
+ */
+public class HostnameFilter implements Filter {
+ private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class);
+
+ static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>();
+
+ /**
+ * Initializes the filter.
+ *
+ * @param config filter configuration.
+ *
+ * @throws javax.servlet.ServletException thrown if the filter could not be initialized.
+ */
+ @Override
+ public void init(FilterConfig config) throws ServletException {
+ }
+
+ /**
+ * Resolves the requester hostname and delegates the request to the chain.
+ * <p>
+ * The requester hostname is available via the {@link #get} method.
+ *
+ * @param request servlet request.
+ * @param response servlet response.
+ * @param chain filter chain.
+ *
+ * @throws java.io.IOException thrown if an IO error occurs.
+ * @throws ServletException thrown if a servlet error occurs.
+ */
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+ try {
+ String hostname;
+ try {
+ String address = request.getRemoteAddr();
+ if (address != null) {
+ hostname = InetAddress.getByName(address).getCanonicalHostName();
+ } else {
+ LOG.warn("Request remote address is NULL");
+ hostname = "???";
+ }
+ } catch (UnknownHostException ex) {
+ LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex);
+ hostname = "???";
+ }
+ HOSTNAME_TL.set(hostname);
+ chain.doFilter(request, response);
+ } finally {
+ HOSTNAME_TL.remove();
+ }
+ }
+
+ /**
+ * Returns the requester hostname.
+ *
+ * @return the requester hostname.
+ */
+ public static String get() {
+ return HOSTNAME_TL.get();
+ }
+
+ /**
+ * Destroys the filter.
+ * <p>
+ * This implementation is a NOP.
+ */
+ @Override
+ public void destroy() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 5023db3..e80967f 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -23,13 +23,14 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.HostnameFilter;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.engine.OozieClientFactory;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
@@ -72,10 +73,6 @@ public class JobLogMover {
}
public void moveLog(WorkflowExecutionContext context){
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Unable to move logs as security is enabled.");
- return;
- }
try {
run(context);
} catch (Exception ignored) {
@@ -95,10 +92,11 @@ public class JobLogMover {
String instanceOwner = context.getWorkflowUser();
if (StringUtils.isNotBlank(instanceOwner)) {
CurrentUser.authenticate(instanceOwner);
+ CurrentUser.proxyDoAsUser(instanceOwner, HostnameFilter.get());
} else {
CurrentUser.authenticate(System.getProperty("user.name"));
}
- OozieClient client = new OozieClient(engineUrl);
+ OozieClient client = OozieClientFactory.getClientRef(engineUrl);
WorkflowJob jobInfo;
try {
jobInfo = client.getJobInfo(context.getWorkflowId());
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index 3380b1a..278b0c4 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -55,7 +55,7 @@ public final class OozieClientFactory {
return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
}
- private static OozieClient getClientRef(String oozieUrl)
+ public static OozieClient getClientRef(String oozieUrl)
throws FalconException {
if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
index eaf3ede..fca6137 100644
--- a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
@@ -489,6 +489,26 @@ public class ProxyOozieClient extends AuthOozieClient {
}
@Override
+ public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, final String scope,
+ final boolean refresh, final boolean noCleanup,
+ final boolean failed, final Properties props)
+ throws OozieClientException {
+ try {
+ return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorAction>>() {
+
+ public List<CoordinatorAction> call() throws Exception {
+ return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed,
+ props);
+ }
+ });
+ } catch (OozieClientException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new OozieClientException(e.toString(), e);
+ }
+ }
+
+ @Override
public Void reRunBundle(final String jobId, final String coordScope, final String dateScope,
final boolean refresh, final boolean noCleanup)
throws OozieClientException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java b/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
deleted file mode 100644
index 19e7bf4..0000000
--- a/prism/src/main/java/org/apache/falcon/security/HostnameFilter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Filter that resolves the requester hostname.
- */
-public class HostnameFilter implements Filter {
- private static final Logger LOG = LoggerFactory.getLogger(HostnameFilter.class);
-
- static final ThreadLocal<String> HOSTNAME_TL = new ThreadLocal<>();
-
- /**
- * Initializes the filter.
- *
- * @param config filter configuration.
- *
- * @throws javax.servlet.ServletException thrown if the filter could not be initialized.
- */
- @Override
- public void init(FilterConfig config) throws ServletException {
- }
-
- /**
- * Resolves the requester hostname and delegates the request to the chain.
- * <p>
- * The requester hostname is available via the {@link #get} method.
- *
- * @param request servlet request.
- * @param response servlet response.
- * @param chain filter chain.
- *
- * @throws java.io.IOException thrown if an IO error occurs.
- * @throws ServletException thrown if a servlet error occurs.
- */
- @Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
- throws IOException, ServletException {
- try {
- String hostname;
- try {
- String address = request.getRemoteAddr();
- if (address != null) {
- hostname = InetAddress.getByName(address).getCanonicalHostName();
- } else {
- LOG.warn("Request remote address is NULL");
- hostname = "???";
- }
- } catch (UnknownHostException ex) {
- LOG.warn("Request remote address could not be resolved, {}", ex.toString(), ex);
- hostname = "???";
- }
- HOSTNAME_TL.set(hostname);
- chain.doFilter(request, response);
- } finally {
- HOSTNAME_TL.remove();
- }
- }
-
- /**
- * Returns the requester hostname.
- *
- * @return the requester hostname.
- */
- public static String get() {
- return HOSTNAME_TL.get();
- }
-
- /**
- * Destroys the filter.
- * <p>
- * This implementation is a NOP.
- */
- @Override
- public void destroy() {
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
index 000fd55..8a75754 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java
@@ -77,7 +77,7 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst
// Login the user to access WfEngine as this user
CurrentUser.authenticate(message.getWorkflowUser());
AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
- message.getEntityName());
+ message.getEntityName(), message.getWorkflowUser());
String jobStatus = wfEngine.getWorkflowStatus(
message.getClusterName(), message.getWfId());
handleRerun(message.getClusterName(), jobStatus, message,
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 700095e..bfeb6c3 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -25,6 +25,8 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Retry;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.HostnameFilter;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -60,7 +62,12 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
String wfId, String parentId, String workflowUser, long msgReceivedTime);
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
- public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) throws FalconException {
+ public AbstractWorkflowEngine getWfEngine(String entityType, String entityName, String doAsUser)
+ throws FalconException {
+ if (StringUtils.isNotBlank(doAsUser)) {
+ CurrentUser.authenticate(doAsUser);
+ CurrentUser.proxyDoAsUser(doAsUser, HostnameFilter.get());
+ }
if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
return wfEngine;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 98db379..1ba5a65 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -85,7 +85,8 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
if (StringUtils.isBlank(id)) {
id = message.getWfId();
}
- handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, true);
+ handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
+ .reRun(message.getClusterName(), id, null, true);
LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
message.getWfId(), message.getClusterName());
} catch (Exception e) {
@@ -106,7 +107,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
public String detectLate(LaterunEvent message) throws Exception {
LateDataHandler late = new LateDataHandler();
AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
- message.getEntityName());
+ message.getEntityName(), message.getWorkflowUser());
Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 02ab792..0be0b25 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -66,7 +66,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
Long wait = getEventDelay(entity, nominalTime);
if (wait == -1) {
LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
- AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
+ AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName, entity.getACL().getOwner());
java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
String logDir = properties.getProperty("logDir");
String srcClusterName = properties.getProperty("srcClusterName");
http://git-wip-us.apache.org/repos/asf/falcon/blob/d7a4eeb4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
index 3cad362..7ee2337 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java
@@ -63,7 +63,9 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>>
if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) {
id = message.getParentId();
}
- handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);
+
+ handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
+ .reRun(message.getClusterName(), id, null, false);
} catch (Exception e) {
if (e instanceof EntityNotRegisteredException) {
LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "