You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/09/20 10:17:35 UTC

[2/2] oozie git commit: OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)

OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9cb4bd05
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9cb4bd05
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9cb4bd05

Branch: refs/heads/master
Commit: 9cb4bd05aa4a86d2193efe5f7bb9ef53f8fc33d2
Parents: d135b88
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed Sep 20 12:17:27 2017 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed Sep 20 12:17:27 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/oozie/ErrorCode.java   |   3 +
 .../action/hadoop/CredentialsProperties.java    |   5 +
 .../hadoop/CredentialsProviderFactory.java      |  17 +-
 .../action/hadoop/HCatCredentialHelper.java     |   5 +-
 .../oozie/action/hadoop/HCatCredentials.java    |   2 +-
 .../oozie/action/hadoop/HDFSCredentials.java    |  61 +++
 .../oozie/action/hadoop/HadoopTokenHelper.java  |  85 +++++
 .../oozie/action/hadoop/HbaseCredentials.java   |   4 +-
 .../oozie/action/hadoop/Hive2Credentials.java   |  14 +-
 .../oozie/action/hadoop/JHSCredentials.java     | 119 ++++++
 .../oozie/action/hadoop/JavaActionExecutor.java | 203 +++-------
 .../oozie/action/hadoop/YarnRMCredentials.java  |  67 ++++
 .../oozie/service/HadoopAccessorService.java    |  91 +----
 .../action/hadoop/TestHadoopTokenHelper.java    |  44 +++
 .../action/hadoop/TestJavaActionExecutor.java   |   1 -
 .../oozie/action/hadoop/TestShellMain.java      |   2 +-
 .../wf/TestWorkflowActionKillXCommand.java      |   2 +-
 .../service/TestHadoopAccessorService.java      |  31 --
 release-log.txt                                 |   1 +
 .../action/hadoop/AMRMClientAsyncFactory.java   |   4 +-
 .../oozie/action/hadoop/HdfsOperations.java     |  84 ++---
 .../apache/oozie/action/hadoop/LauncherAM.java  | 367 ++++++++++---------
 .../apache/oozie/action/hadoop/ShellMain.java   |   1 -
 .../oozie/action/hadoop/TestHdfsOperations.java |  11 -
 .../oozie/action/hadoop/TestLauncherAM.java     |  36 +-
 25 files changed, 728 insertions(+), 532 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 662e1ed..168c4fa 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -89,6 +89,9 @@ public enum ErrorCode {
     E0508(XLog.OPS, "User [{0}] not authorized for WF job [{1}]"),
     E0509(XLog.OPS, "User [{0}] not authorized for Coord job [{1}]"),
     E0510(XLog.OPS, "Unable to get Credential [{0}]"),
+    E0511(XLog.STD, "No HDFS delegation token present, can''t set credentials. [serverPrincipal={0}]"),
+    E0512(XLog.STD, "Could not get RM delegation token: {0}"),
+    E0513(XLog.STD, "No YARN renewer present, can''t get token. [servicePrincipal={0}]"),
 
     E0550(XLog.OPS, "Could not normalize host name [{0}], {1}"),
     E0551(XLog.OPS, "Missing [{0}] property"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
index 20f93ce..3dea787 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
@@ -84,4 +84,9 @@ public class CredentialsProperties {
     public void setProperties(HashMap<String, String> properties) {
         this.properties = properties;
     }
+
+    @Override
+    public String toString() {
+        return String.format("name=%s, type=%s", name, type);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
index 5ca8d3e..a353e15 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
@@ -23,15 +23,20 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.util.XLog;
 
 public class CredentialsProviderFactory {
     public static final String CRED_KEY = "oozie.credentials.credentialclasses";
     private static final XLog LOG = XLog.getLog(CredentialsProviderFactory.class);
+    public static final String HDFS = "hdfs";
+    public static final String YARN = "yarnRM";
+    public static final String JHS = "jhs";
     private static CredentialsProviderFactory instance;
-    private final Map<String, Class<CredentialsProvider>> providerCache;
+    private final Map<String, Class<? extends CredentialsProvider>> providerCache;
 
     @VisibleForTesting
     static void destroy() {
@@ -70,6 +75,14 @@ public class CredentialsProviderFactory {
                 }
             }
         }
+        providerCache.put(HDFS, HDFSCredentials.class);
+        providerCache.put(YARN, YarnRMCredentials.class);
+        providerCache.put(JHS, JHSCredentials.class);
+    }
+
+    static Text getUniqueAlias(Token<?> token) {
+        return new Text(String.format("%s_%s_%d", token.getKind().toString(),
+                token.getService().toString(), System.currentTimeMillis()));
     }
 
     /**
@@ -80,7 +93,7 @@ public class CredentialsProviderFactory {
      * @throws Exception
      */
     public CredentialsProvider createCredentialsProvider(String type) throws Exception {
-        Class<CredentialsProvider> providerClass = providerCache.get(type);
+        Class<? extends CredentialsProvider> providerClass = providerCache.get(type);
         if(providerClass == null){
             return null;
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
index 9804c7b..274db78 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
@@ -20,7 +20,6 @@ package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -63,8 +62,8 @@ public class HCatCredentialHelper {
                     .getLoginUser().getShortUserName());
             Token<DelegationTokenIdentifier> hcatToken = new Token<DelegationTokenIdentifier>();
             hcatToken.decodeFromUrlString(tokenStrForm);
-            credentials.addToken(new Text("HCat Token"), hcatToken);
-            XLog.getLog(getClass()).debug("Added the HCat token to launcher configuration");
+            credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hcatToken), hcatToken);
+            XLog.getLog(getClass()).debug("Added the HCat token to launcher's credentials");
         }
         catch (Exception ex) {
             XLog.getLog(getClass()).debug("set Exception {0}", ex.getMessage());

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
index 52abbf1..47b2407 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
@@ -70,7 +70,7 @@ public class HCatCredentials implements CredentialsProvider {
             hcch.set(credentials, config, principal, server);
         }
         catch (Exception e) {
-            XLog.getLog(getClass()).warn("Exception in addtoJobConf", e);
+            XLog.getLog(getClass()).warn("Exception in updateCredentials", e);
             throw e;
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
new file mode 100644
index 0000000..c693399
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.util.XLog;
+
+
+public class HDFSCredentials implements CredentialsProvider {
+    protected XLog LOG = XLog.getLog(getClass());
+
+    /**
+     * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided.
+     * This is also important to ensure that log aggregation works correctly from the NM
+     *
+     * @param credentials the credentials object which is updated
+     * @param config launcher AM configuration
+     * @param props properties for getting credential token or certificate
+     * @param context workflow context
+     * @throws Exception thrown if failed
+     */
+    @Override
+    public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+                                  ActionExecutor.Context context) throws Exception {
+        try (FileSystem fileSystem = context.getAppFileSystem()) {
+            final String renewer = new HadoopTokenHelper().getServerPrincipal(config);
+            LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer);
+            final Token hdfsDelegationToken = fileSystem.getDelegationToken(renewer);
+            if (hdfsDelegationToken == null) {
+                throw new CredentialException(ErrorCode.E0511, renewer);
+            }
+            LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", hdfsDelegationToken);
+            credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken);
+        } catch (Exception e) {
+            LOG.debug("exception in updateCredentials", e);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
new file mode 100644
index 0000000..1018d9d
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.util.XLog;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class HadoopTokenHelper {
+    /** The Kerberos principal for the resource manager.*/
+    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
+    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
+    private XLog LOG = XLog.getLog(getClass());
+
+    private String getServicePrincipal(final Configuration configuration) {
+        return configuration.get(RM_PRINCIPAL);
+    }
+
+    String getServerPrincipal(final Configuration configuration) throws IOException {
+        return getServerPrincipal(configuration, getServicePrincipal(configuration));
+    }
+
+    /**
+     * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer.
+     *
+     * @param configuration the {@link Configuration} containing the YARN RM address
+     * @param servicePrincipal the configured service principal
+     * @return the server principal originating from the host name and the service principal
+     * @throws IOException when something goes wrong finding out the local address inside
+     * {@link SecurityUtil#getServerPrincipal(String, String)}
+     */
+    private String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException {
+        Preconditions.checkNotNull(configuration, "configuration has to be filled");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)),
+                String.format("configuration entry %s has to be filled", HADOOP_YARN_RM));
+
+        String serverPrincipal;
+        final String target = configuration.get(HADOOP_YARN_RM);
+
+        try {
+            final String addr = NetUtils.createSocketAddr(target).getHostName();
+            serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr);
+            LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target);
+        } catch (final IllegalArgumentException iae) {
+            LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae);
+
+            serverPrincipal = servicePrincipal.split("[/@]")[0];
+            LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal);
+        }
+
+        return serverPrincipal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
index 4add5f1..bc87f29 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
@@ -79,6 +79,7 @@ public class HbaseCredentials implements CredentialsProvider {
         User u = User.create(ugi);
         // A direct doAs is required here vs. User#obtainAuthTokenForJob(...)
         // See OOZIE-2419 for more
+        XLog.getLog(getClass()).debug("Getting Hbase token for user {0}", user);
         Token<AuthenticationTokenIdentifier> token = u.runAs(
             new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
                 public Token<AuthenticationTokenIdentifier> run() throws Exception {
@@ -90,7 +91,8 @@ public class HbaseCredentials implements CredentialsProvider {
                 }
             }
         );
-        credentials.addToken(token.getService(), token);
+        XLog.getLog(getClass()).debug("Got token, adding it to credentials.");
+        credentials.addToken(CredentialsProviderFactory.getUniqueAlias(token), token);
     }
 
     private void addPropsConf(CredentialsProperties props, Configuration destConf) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
index 0b495f7..d34f560 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
@@ -23,7 +23,6 @@ import java.sql.DriverManager;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hive.jdbc.HiveConnection;
@@ -32,9 +31,9 @@ import org.apache.oozie.action.ActionExecutor.Context;
 import org.apache.oozie.util.XLog;
 
 /**
- * Credentials implementation to store in jobConf, Hive Server 2 specific properties
+ * Credentials implementation, Hive Server 2 specific properties
  * User specifies these credential properties along with the action configuration
- * The jobConf is used further to pass credentials to the tasks while running
+ * The credentials is used further to pass credentials to the tasks while running
  * Oozie server should be configured to use this class by including it via property 'oozie.credentials.credentialclasses'
  * User can extend the parent class to implement own class as well
  * for handling custom token-based credentials and add to the above server property
@@ -60,7 +59,7 @@ public class Hive2Credentials implements CredentialsProvider {
             String principal = props.getProperties().get(HIVE2_SERVER_PRINCIPAL);
             if (principal == null || principal.isEmpty()) {
                 throw new CredentialException(ErrorCode.E0510,
-                        HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credential");
+                        HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credentials");
             }
             url = url + ";principal=" + principal;
             Connection con = null;
@@ -79,12 +78,13 @@ public class Hive2Credentials implements CredentialsProvider {
 
             Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
             hive2Token.decodeFromUrlString(tokenStr);
-            credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token);
-            XLog.getLog(getClass()).debug("Added the Hive Server 2 token in job conf");
+            credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hive2Token), hive2Token);
+            XLog.getLog(getClass()).debug("Added the Hive Server 2 token to launcher's credential");
         }
         catch (Exception e) {
-            XLog.getLog(getClass()).warn("Exception in addtoJobConf", e);
+            XLog.getLog(getClass()).warn("Exception in obtaining Hive2 token", e);
             throw e;
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
new file mode 100644
index 0000000..d9099c5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.XLog;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+
+public class JHSCredentials implements CredentialsProvider {
+    protected XLog LOG = XLog.getLog(getClass());
+
+
+    /**
+     * Add an MR_DELEGATION_TOKEN to the {@link Credentials} provided.
+     * @param credentials the credentials object which is updated
+     * @param config launcher AM configuration
+     * @param props properties for getting credential token or certificate
+     * @param context workflow context
+     * @throws Exception thrown if failed
+     */
+    @Override
+    public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+                                  ActionExecutor.Context context) throws Exception {
+        try {
+            LOG.debug("Instantiating JHS Proxy");
+            MRClientProtocol hsProxy = instantiateHistoryProxy(config, context);
+            Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress());
+            LOG.debug("Getting delegation token for {0}", hsService.toString());
+            Token<?> jhsToken = getDelegationTokenFromJHS(hsProxy, new HadoopTokenHelper().getServerPrincipal(config));
+            LOG.debug("Acquired token {0}", jhsToken);
+            credentials.addToken(hsService, jhsToken);
+        } catch (IOException | InterruptedException ex) {
+            LOG.debug("exception in updateCredentials", ex);
+            throw new CredentialException(ErrorCode.E0512, ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * Get a Delegation token from the JHS.
+     * Copied over from YARNRunner in Hadoop.
+     * @param hsProxy protcol used to get the token
+     * @return The RM_DELEGATION_TOKEN that can be used to talk to JHS
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private Token<?> getDelegationTokenFromJHS(final MRClientProtocol hsProxy, final String renewer)
+            throws IOException, InterruptedException {
+        GetDelegationTokenRequest request = RecordFactoryProvider
+                .getRecordFactory(null).newRecordInstance(GetDelegationTokenRequest.class);
+        LOG.debug("Creating requsest to JHS using renewer [{0}]", renewer);
+        request.setRenewer(renewer);
+        org.apache.hadoop.yarn.api.records.Token mrDelegationToken = hsProxy.getDelegationToken(request)
+                .getDelegationToken();
+        LOG.debug("Got token to JHS : {0}. Converting token.", mrDelegationToken);
+        return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
+    }
+
+    /**
+     * Create an MRClientProtocol to the JHS
+     * Copied over from ClientCache in Hadoop.
+     * @return the protocol that can be used to get a token with
+     * @throws IOException
+     */
+    private MRClientProtocol instantiateHistoryProxy(final Configuration configuration, final ActionExecutor.Context context)
+            throws IOException {
+        final String serviceAddr = configuration.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+        if (StringUtils.isEmpty(serviceAddr)) {
+            return null;
+        }
+        LOG.debug("Connecting to JHS at: " + serviceAddr);
+        final YarnRPC rpc = YarnRPC.create(configuration);
+        LOG.debug("Connected to JHS at: " + serviceAddr);
+        UserGroupInformation currentUser = Services.get().get(UserGroupInformationService.class)
+                .getProxyUser(context.getWorkflow().getUser());
+        return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+            @Override
+            public MRClientProtocol run() {
+                return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+                        NetUtils.createSocketAddr(serviceAddr), configuration);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 9d1afb5..be05603 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.TaskLog;
@@ -60,8 +59,6 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -530,9 +527,9 @@ public class JavaActionExecutor extends ActionExecutor {
             return conf;
         }
         catch (Exception ex) {
-            LOG.debug(
-                    "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive="
-                            + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString());
+            LOG.debug("Errors when add to DistributedCache. Path=" +
+                    Objects.toString(uri, "<null>") + ", archive=" + archive + ", conf=" +
+                    XmlUtils.prettyPrint(conf).toString());
             throw convertException(ex);
         }
     }
@@ -1021,42 +1018,33 @@ public class JavaActionExecutor extends ActionExecutor {
                 }
             }
 
-            // Setting the credential properties in launcher conf
-            Configuration credentialsConf = null;
-
+            Credentials credentials = new Credentials();
+            Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
+            yarnClient = createYarnClient(context, launcherConf);
             Map<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
                     action, actionConf);
-            Credentials credentials = null;
-            if (credentialsProperties != null) {
-                credentials = new Credentials();
-                // Adding if action need to set more credential tokens
-                credentialsConf = new Configuration(false);
-                XConfiguration.copy(actionConf, credentialsConf);
-                setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
-
-                // insert conf to action conf from credentialsConf
-                for (Entry<String, String> entry : credentialsConf) {
-                    if (actionConf.get(entry.getKey()) == null) {
-                        actionConf.set(entry.getKey(), entry.getValue());
-                    }
-                }
+            if (UserGroupInformation.isSecurityEnabled()) {
+                addHadoopCredentialPropertiesToActionConf(credentialsProperties);
             }
-            Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
+            // Adding if action need to set more credential tokens
+            Configuration credentialsConf = new Configuration(false);
+            XConfiguration.copy(actionConf, credentialsConf);
+            setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
 
+            // copy back new entries from credentialsConf
+            for (Entry<String, String> entry : credentialsConf) {
+                if (actionConf.get(entry.getKey()) == null) {
+                    actionConf.set(entry.getKey(), entry.getValue());
+                }
+            }
             String consoleUrl;
-            String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+            String launcherId = LauncherHelper.getRecoveryId(launcherConf, context.getActionDir(), context
                     .getRecoveryId());
             boolean alreadyRunning = launcherId != null;
 
             // if user-retry is on, always submit new launcher
             boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
             LOG.debug("Creating yarnClient for action {0}", action.getId());
-            yarnClient = createYarnClient(context, launcherJobConf);
-
-            if (UserGroupInformation.isSecurityEnabled()) {
-                credentials = ensureCredentials(credentials);
-                acquireHDFSDelegationToken(actionFs, credentialsConf, credentials);
-            }
 
             if (alreadyRunning && !isUserRetry) {
                 try {
@@ -1066,51 +1054,16 @@ public class JavaActionExecutor extends ActionExecutor {
                 } catch (RemoteException e) {
                     // caught when the application id does not exist
                     LOG.error("Got RemoteException from YARN", e);
-                    String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
+                    String jobTracker = launcherConf.get(HADOOP_YARN_RM);
                     throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
                             "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
                 }
             }
             else {
-                // TODO: OYA: do we actually need an MR token?  IIRC, it's issued by the JHS
-//                // setting up propagation of the delegation token.
-//                Token<DelegationTokenIdentifier> mrdt = null;
-//                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-//                mrdt = jobClient.getDelegationToken(has
-//                        .getMRDelegationTokenRenewer(launcherJobConf));
-//                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
-
-                // insert credentials tokens to launcher job conf if needed
-                if (credentialsConf != null) {
-                    for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) {
-                        Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
-                        LOG.debug("ADDING TOKEN: " + fauxAlias);
-                        credentials.addToken(fauxAlias, tk);
-                    }
-                    if (credentials.numberOfSecretKeys() > 0) {
-                        for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
-                            CredentialsProperties credProps = entry.getValue();
-                            if (credProps != null) {
-                                Text credName = new Text(credProps.getName());
-                                byte[] secKey = credentials.getSecretKey(credName);
-                                if (secKey != null) {
-                                    LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
-                                    credentials.addSecretKey(credName, secKey);
-                                }
-                            }
-                        }
-                    }
-                }
-                else {
-                    LOG.info("No need to inject credentials.");
-                }
-
-                String user = context.getWorkflow().getUser();
-
                 YarnClientApplication newApp = yarnClient.createApplication();
                 ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
                 ApplicationSubmissionContext appContext =
-                        createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(),
+                        createAppSubmissionContext(appId, launcherConf, context, actionConf, action.getName(),
                                 credentials, actionXml);
                 yarnClient.submitApplication(appContext);
 
@@ -1120,7 +1073,7 @@ public class JavaActionExecutor extends ActionExecutor {
                 consoleUrl = appReport.getTrackingUrl();
             }
 
-            String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
+            String jobTracker = launcherConf.get(HADOOP_YARN_RM);
             context.setStartData(launcherId, jobTracker, consoleUrl);
         }
         catch (Exception ex) {
@@ -1146,58 +1099,19 @@ public class JavaActionExecutor extends ActionExecutor {
         return context.getVar(OOZIE_ACTION_NAME);
     }
 
-    private Credentials ensureCredentials(final Credentials credentials) {
-        if (credentials == null) {
-            LOG.debug("No credentials present, creating a new one.");
-            return new Credentials();
-        }
-
-        return credentials;
+    private void addHadoopCredentialPropertiesToActionConf(Map<String, CredentialsProperties> credentialsProperties) {
+        LOG.info("Adding default credentials for action: hdfs, yarn and jhs");
+        addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.HDFS);
+        addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.YARN);
+        addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.JHS);
     }
 
-    /**
-     * In a secure environment, when both HDFS HA and log aggregation are turned on, {@link JavaActionExecutor} is not able to call
-     * {@link YarnClient#submitApplication} since {@code HDFS_DELEGATION_TOKEN} is missing.
-     *
-     * @param actionFs the {@link FileSystem} to get the delegation token from
-     * @param credentialsConf the {@link Configuration} to extract the YARN renewer
-     * @param credentials the {@link Credentials} where the delegation token is stored
-     * @throws IOException
-     * @throws ActionExecutorException when security is enabled, but either {@code credentials} are empty, or
-     * {@code serverPrincipal} is empty, or HDFS delegation token is not present within {@code actionFs}
-     */
-    private void acquireHDFSDelegationToken(final FileSystem actionFs,
-                                            final Configuration credentialsConf,
-                                            final Credentials credentials)
-            throws IOException, ActionExecutorException {
-        LOG.debug("Security is enabled, checking credentials to acquire HDFS delegation token.");
-
-        final HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class);
-        final String servicePrincipal = hadoopAccessorService.getServicePrincipal(credentialsConf);
-        final String serverPrincipal = hadoopAccessorService.getServerPrincipal(
-                credentialsConf,
-                servicePrincipal);
-        if (serverPrincipal == null) {
-            final String errorTemplate = "No server principal present, won't get HDFS delegation token. [servicePrincipal={0}]";
-            LOG.error(errorTemplate, servicePrincipal);
-            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, servicePrincipal);
-        }
-
-        LOG.debug("Server principal present, getting HDFS delegation token. [serverPrincipal={0}]", serverPrincipal);
-        final Token hdfsDelegationToken = actionFs.getDelegationToken(serverPrincipal);
-        if (hdfsDelegationToken == null) {
-            final String errorTemplate = "No HDFS delegation token present, won't set credentials. [serverPrincipal={0}]";
-            LOG.error(errorTemplate, serverPrincipal);
-            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, serverPrincipal);
-        }
-
-        LOG.debug("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]",
-                hdfsDelegationToken);
-        credentials.addToken(new Text(hdfsDelegationToken.getService().toString()), hdfsDelegationToken);
+    private void addHadoopCredentialProperties(Map<String, CredentialsProperties> credentialsProperties, String type) {
+        credentialsProperties.put(type, new CredentialsProperties(type, type));
     }
 
     private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf,
-                                                                    String user, Context context, Configuration actionConf, String actionName,
+                                                                    Context context, Configuration actionConf, String actionName,
                                                                     Credentials credentials, Element actionXml)
             throws IOException, HadoopAccessorException, URISyntaxException {
 
@@ -1212,12 +1126,14 @@ public class JavaActionExecutor extends ActionExecutor {
 
         ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 
+        final String user = context.getWorkflow().getUser();
         // Set the resources to localize
         Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
         ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
         MRApps.setupDistributedCache(launcherJobConf, localResources);
         // Add the Launcher and Action configs as Resources
         HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        launcherJobConf.set(LauncherAM.OOZIE_SUBMITTER_USER, user);
         LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
                 launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
         localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
@@ -1398,37 +1314,38 @@ public class JavaActionExecutor extends ActionExecutor {
         return envMap;
     }
 
-    protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
-            WorkflowAction action, Configuration actionConf) throws Exception {
-        HashMap<String, CredentialsProperties> credPropertiesMap = null;
-        if (context != null && action != null) {
-            if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
-                XConfiguration wfJobConf = getWorkflowConf(context);
-                if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
-                    !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
-                    credPropertiesMap = getActionCredentialsProperties(context, action);
-                    if (!credPropertiesMap.isEmpty()) {
-                        for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
-                            if (entry.getValue() != null) {
-                                CredentialsProperties prop = entry.getValue();
-                                LOG.debug("Credential Properties set for action : " + action.getId());
-                                for (Entry<String, String> propEntry : prop.getProperties().entrySet()) {
-                                    String key = propEntry.getKey();
-                                    String value = propEntry.getValue();
-                                    actionConf.set(key, value);
-                                    LOG.debug("property : '" + key + "', value : '" + value + "'");
-                                }
-                            }
-                        }
-                    } else {
-                        LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
+   Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context,
+                                                                         final WorkflowAction action,
+                                                                         final Configuration actionConf) throws Exception {
+        final Map<String, CredentialsProperties> credPropertiesMap = new HashMap<>();
+        if (context == null || action == null) {
+            LOG.warn("context or action is null");
+            return credPropertiesMap;
+        }
+        final XConfiguration wfJobConf = getWorkflowConf(context);
+        final boolean skipCredentials = actionConf.getBoolean(OOZIE_CREDENTIALS_SKIP,
+                wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)));
+        if (skipCredentials) {
+            LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
+        } else {
+            credPropertiesMap.putAll(getActionCredentialsProperties(context, action));
+            if (credPropertiesMap.isEmpty()) {
+                LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
+                return credPropertiesMap;
+            }
+            for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
+                if (entry.getValue() != null) {
+                    final CredentialsProperties prop = entry.getValue();
+                    LOG.debug("Credential Properties set for action : " + action.getId());
+                    for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) {
+                        final String key = propEntry.getKey();
+                        final String value = propEntry.getValue();
+                        actionConf.set(key, value);
+                        LOG.debug("property : '" + key + "', value : '" + value + "'");
                     }
-                } else {
-                    LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
                 }
             }
         }
-
         return credPropertiesMap;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
new file mode 100644
index 0000000..061ca05
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XLog;
+
+public class YarnRMCredentials implements CredentialsProvider {
+    /**
+     * Add an RM_DELEGATION_TOKEN to the {@link Credentials} provided.
+     *
+     * @param credentials the credentials object which is updated
+     * @param config launcher AM configuration
+     * @param props properties for getting credential token or certificate
+     * @param context workflow context
+     * @throws Exception thrown if failed
+     */
+    @Override
+    public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+                                  ActionExecutor.Context context) throws Exception {
+        Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(config);
+        if (rmDelegationTokenService == null) {
+            throw new CredentialException(ErrorCode.E0512, "Can't create RMDelegationTokenService");
+        }
+        try (YarnClient yarnClient = Services.get().get(HadoopAccessorService.class)
+                .createYarnClient(context.getWorkflow().getUser(), config)) {
+            org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
+                    yarnClient.getRMDelegationToken(new Text(new HadoopTokenHelper().getServerPrincipal(config)));
+            if (rmDelegationToken == null) {
+                throw new CredentialException(ErrorCode.E0512, "Returned token is null");
+            }
+            Token<TokenIdentifier> rmToken = ConverterUtils.convertFromYarn(rmDelegationToken, rmDelegationTokenService);
+            credentials.addToken(rmDelegationTokenService, rmToken);
+        } catch (Exception e) {
+            XLog.getLog(getClass()).debug("Exception in updateCredentials", e);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 187cee2..73300a6 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -20,23 +20,38 @@ package org.apache.oozie.service;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Master;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.ParamChecker;
@@ -55,6 +70,7 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -84,16 +100,8 @@ public class HadoopAccessorService implements Service {
     public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
     public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
     public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
-    public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
-
-    /** The Kerberos principal for the job tracker.*/
-    protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
-    /** The Kerberos principal for the resource manager.*/
-    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
-    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
 
     private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
-    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
     private static final String DEFAULT_ACTIONNAME = "default";
     private static Configuration cachedConf;
 
@@ -335,7 +343,7 @@ public class HadoopAccessorService implements Service {
         return HadoopAccessorService.class;
     }
 
-    private UserGroupInformation getUGI(String user) throws IOException {
+    UserGroupInformation getUGI(String user) throws IOException {
         return ugiService.getProxyUser(user);
     }
 
@@ -650,71 +658,6 @@ public class HadoopAccessorService implements Service {
         }
     }
 
-    public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
-        if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
-            return getMRTokenRenewerInternal(jobConf);
-        }
-        else {
-            return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
-        }
-    }
-
-    // Package private for unit test purposes
-    Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
-        // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
-        // support for renewing/cancelling tokens
-        final String servicePrincipal = getServicePrincipal(jobConf);
-        Text renewer;
-        if (servicePrincipal != null) { // secure cluster
-            renewer = mrTokenRenewers.get(servicePrincipal);
-            if (renewer == null) {
-                renewer = new Text(getServerPrincipal(jobConf, servicePrincipal));
-                mrTokenRenewers.put(servicePrincipal, renewer);
-            }
-        }
-        else {
-            renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
-        }
-        return renewer;
-    }
-
-    public String getServicePrincipal(final Configuration configuration) {
-        return configuration.get(RM_PRINCIPAL, configuration.get(JT_PRINCIPAL));
-    }
-
-    /**
-     * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer.
-     *
-     * @param configuration the {@link Configuration} containing the YARN RM address
-     * @param servicePrincipal the configured service principal
-     * @return the server principal originating from the host name and the service principal
-     * @throws IOException when something goes wrong finding out the local address inside
-     * {@link SecurityUtil#getServerPrincipal(String, String)}
-     */
-    public String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException {
-        Preconditions.checkNotNull(configuration, "configuration has to be filled");
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled");
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)),
-                String.format("configuration entry %s has to be filled", HADOOP_YARN_RM));
-
-        String serverPrincipal;
-        final String target = configuration.get(HADOOP_YARN_RM);
-
-        try {
-            final String addr = NetUtils.createSocketAddr(target).getHostName();
-            serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr);
-            LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target);
-        }
-        catch (final IllegalArgumentException iae) {
-            LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae);
-
-            serverPrincipal = servicePrincipal.split("[/@]")[0];
-            LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal);
-        }
-
-        return serverPrincipal;
-    }
-
     public void addFileToClassPath(String user, final Path file, final Configuration conf)
             throws IOException {
         ParamChecker.notEmpty(user, "user");

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
new file mode 100644
index 0000000..17cdd9d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHadoopTokenHelper {
+
+    @Test
+    public void testGetMRDelegationTokenRenewer() throws Exception {
+        HadoopTokenHelper hadoopTokenHelper = new HadoopTokenHelper();
+        Configuration configuration = new Configuration(false);
+        configuration.set("yarn.resourcemanager.address", "localhost:8032");
+        configuration.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
+        assertEquals("yarn setup","rm/server.com@KDC.DOMAIN.COM",
+                hadoopTokenHelper.getServerPrincipal(configuration));
+
+        configuration = new Configuration(false);
+        configuration.set("yarn.resourcemanager.address", "rm-ha-uri");
+        configuration.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
+        assertEquals("yarn ha setup","rm",
+                hadoopTokenHelper.getServerPrincipal(configuration));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 02e60c0..0d1dd97 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -1083,7 +1083,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         // Define 'abc' token type in oozie-site
         ConfigurationService.set("oozie.credentials.credentialclasses", "abc=org.apache.oozie.action.hadoop.InsertTestToken");
         ConfigurationService.setBoolean("oozie.credentials.skip", skipSite);
-
         // Setting the credential properties in launcher conf
         Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
                 action, actionConf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
index a7d6c18..de67365 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
@@ -48,7 +48,7 @@ public class TestShellMain extends ShellTestCase {
         jobConf.setInt("mapred.reduce.max.attempts", 1);
         jobConf.set("mapred.job.tracker", getJobTrackerUri());
         jobConf.set("fs.default.name", getNameNodeUri());
-
+        jobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, System.currentTimeMillis());
 
         jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME);
         String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" };

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index cf77f18..98a41a1 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -180,7 +180,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
         jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
         jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
-        System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+        jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
 
         jobClient.submitJob(new JobConf(jobConf));
         Set<ApplicationId> apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index 960c2f9..89ce185 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -228,37 +228,6 @@ public class TestHadoopAccessorService extends XFsTestCase {
         }
     }
 
-    public void testGetMRDelegationTokenRenewer() throws Exception {
-        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-        JobConf jobConf = new JobConf(false);
-        assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf.set("yarn.resourcemanager.address", "localhost:50300");
-        jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
-        assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf = new JobConf(false);
-        jobConf.set("mapreduce.jobtracker.address", "127.0.0.1:50300");
-        jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
-        assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf = new JobConf(false);
-        jobConf.set("yarn.resourcemanager.address", "localhost:8032");
-        jobConf.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
-        assertEquals(new Text("rm/server.com@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-
-        // Try the above with logical URIs (i.e. for HA)
-        jobConf = new JobConf(false);
-        jobConf.set("mapred.job.tracker", "jt-ha-uri");
-        jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
-        assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf = new JobConf(false);
-        jobConf.set("mapreduce.jobtracker.address", "jt-ha-uri");
-        jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
-        assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf = new JobConf(false);
-        jobConf.set("yarn.resourcemanager.address", "rm-ha-uri");
-        jobConf.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
-        assertEquals(new Text("rm/server.com@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-    }
-
     public void testCheckSupportedFilesystem() throws Exception {
         Configuration hConf = Services.get().getConf();
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82621ca..bacd686 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)
 OOZIE-2687 Create XML schema for launcher configurations (asasvari) 
 OOZIE-3041 TestWorkflowActionRetryInfoXCommand fails in oozie core module (andras.piros via gezapeti)
 OOZIE-2916 Set a job name for the MR Action's child job (asasvari) 

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
index b4cbb4b..bfb3d76 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
@@ -22,11 +22,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 
 public class AMRMClientAsyncFactory {
 
-    public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) {
+    public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs, AMRMCallBackHandler callBackHandler) {
         AMRMClient<?> amRmClient = AMRMClient.createAMRMClient();
-        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
         AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler);
-
         return amRmClientAsync;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
index 874d371..fcb0a92 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -40,85 +40,59 @@ import com.google.common.base.Preconditions;
 public class HdfsOperations {
     private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
     private final SequenceFileWriterFactory seqFileWriterFactory;
-    private final UserGroupInformation ugi;
 
-    public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) {
+    public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory) {
         this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null");
-        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
     }
 
     /**
      * Creates a Sequence file which contains the output from an action and uploads it to HDFS.
      */
     public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir,
-            final Map<String, String> actionData) throws IOException, InterruptedException {
-        ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
-                // upload into sequence file
-                System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
+                                       final Map<String, String> actionData) throws IOException, InterruptedException {
+        Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
+        // upload into sequence file
+        System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
 
-                try (SequenceFile.Writer wr =
-                        seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
+        try (SequenceFile.Writer wr =
+                     seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
 
-                    if (wr != null) {
-                        for (Entry<String, String> entry : actionData.entrySet()) {
-                            wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
-                        }
-                    } else {
-                        throw new IOException("SequenceFile.Writer is null for " + finalPath);
-                    }
+            if (wr != null) {
+                for (Entry<String, String> entry : actionData.entrySet()) {
+                    wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
                 }
-
-                return null;
+            } else {
+                throw new IOException("SequenceFile.Writer is null for " + finalPath);
             }
-        });
+        }
     }
 
     public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
-        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-            @Override
-            public Boolean run() throws Exception {
-                FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
-                return fs.exists(path);
-            }
-        });
+        FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+        return fs.exists(path);
     }
 
     public void writeStringToFile(final Path path, final Configuration conf, final String contents)
             throws IOException, InterruptedException {
-        ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                try (FileSystem fs = FileSystem.get(path.toUri(), conf);
-                        java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
-                    writer.write(contents);
-                }
-
-                return null;
-            }
-        });
+        try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+             java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
+            writer.write(contents);
+        }
     }
 
     public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
-        return ugi.doAs(new PrivilegedExceptionAction<String>() {
-            @Override
-            public String run() throws Exception {
-                StringBuilder sb = new StringBuilder();
-
-                try (FileSystem fs = FileSystem.get(path.toUri(), conf);
-                        InputStream is = fs.open(path);
-                        BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
+        StringBuilder sb = new StringBuilder();
 
-                    String contents;
-                    while ((contents = reader.readLine()) != null) {
-                        sb.append(contents);
-                    }
-                }
+        try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+             InputStream is = fs.open(path);
+             BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
 
-                return sb.toString();
+            String contents;
+            while ((contents = reader.readLine()) != null) {
+                sb.append(contents);
             }
-        });
+        }
+
+        return sb.toString();
     }
 }