You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/09/20 10:17:35 UTC
[2/2] oozie git commit: OOZIE-2909 LauncherAM: rewrite UGI calls
(gezapeti)
OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9cb4bd05
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9cb4bd05
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9cb4bd05
Branch: refs/heads/master
Commit: 9cb4bd05aa4a86d2193efe5f7bb9ef53f8fc33d2
Parents: d135b88
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed Sep 20 12:17:27 2017 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed Sep 20 12:17:27 2017 +0200
----------------------------------------------------------------------
.../main/java/org/apache/oozie/ErrorCode.java | 3 +
.../action/hadoop/CredentialsProperties.java | 5 +
.../hadoop/CredentialsProviderFactory.java | 17 +-
.../action/hadoop/HCatCredentialHelper.java | 5 +-
.../oozie/action/hadoop/HCatCredentials.java | 2 +-
.../oozie/action/hadoop/HDFSCredentials.java | 61 +++
.../oozie/action/hadoop/HadoopTokenHelper.java | 85 +++++
.../oozie/action/hadoop/HbaseCredentials.java | 4 +-
.../oozie/action/hadoop/Hive2Credentials.java | 14 +-
.../oozie/action/hadoop/JHSCredentials.java | 119 ++++++
.../oozie/action/hadoop/JavaActionExecutor.java | 203 +++-------
.../oozie/action/hadoop/YarnRMCredentials.java | 67 ++++
.../oozie/service/HadoopAccessorService.java | 91 +----
.../action/hadoop/TestHadoopTokenHelper.java | 44 +++
.../action/hadoop/TestJavaActionExecutor.java | 1 -
.../oozie/action/hadoop/TestShellMain.java | 2 +-
.../wf/TestWorkflowActionKillXCommand.java | 2 +-
.../service/TestHadoopAccessorService.java | 31 --
release-log.txt | 1 +
.../action/hadoop/AMRMClientAsyncFactory.java | 4 +-
.../oozie/action/hadoop/HdfsOperations.java | 84 ++---
.../apache/oozie/action/hadoop/LauncherAM.java | 367 ++++++++++---------
.../apache/oozie/action/hadoop/ShellMain.java | 1 -
.../oozie/action/hadoop/TestHdfsOperations.java | 11 -
.../oozie/action/hadoop/TestLauncherAM.java | 36 +-
25 files changed, 728 insertions(+), 532 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 662e1ed..168c4fa 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -89,6 +89,9 @@ public enum ErrorCode {
E0508(XLog.OPS, "User [{0}] not authorized for WF job [{1}]"),
E0509(XLog.OPS, "User [{0}] not authorized for Coord job [{1}]"),
E0510(XLog.OPS, "Unable to get Credential [{0}]"),
+ E0511(XLog.STD, "No HDFS delegation token present, can''t set credentials. [serverPrincipal={0}]"),
+ E0512(XLog.STD, "Could not get RM delegation token: {0}"),
+ E0513(XLog.STD, "No YARN renewer present, can''t get token. [servicePrincipal={0}]"),
E0550(XLog.OPS, "Could not normalize host name [{0}], {1}"),
E0551(XLog.OPS, "Missing [{0}] property"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
index 20f93ce..3dea787 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java
@@ -84,4 +84,9 @@ public class CredentialsProperties {
public void setProperties(HashMap<String, String> properties) {
this.properties = properties;
}
+
+ @Override
+ public String toString() {
+ return String.format("name=%s, type=%s", name, type);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
index 5ca8d3e..a353e15 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java
@@ -23,15 +23,20 @@ import java.util.HashMap;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.util.XLog;
public class CredentialsProviderFactory {
public static final String CRED_KEY = "oozie.credentials.credentialclasses";
private static final XLog LOG = XLog.getLog(CredentialsProviderFactory.class);
+ public static final String HDFS = "hdfs";
+ public static final String YARN = "yarnRM";
+ public static final String JHS = "jhs";
private static CredentialsProviderFactory instance;
- private final Map<String, Class<CredentialsProvider>> providerCache;
+ private final Map<String, Class<? extends CredentialsProvider>> providerCache;
@VisibleForTesting
static void destroy() {
@@ -70,6 +75,14 @@ public class CredentialsProviderFactory {
}
}
}
+ providerCache.put(HDFS, HDFSCredentials.class);
+ providerCache.put(YARN, YarnRMCredentials.class);
+ providerCache.put(JHS, JHSCredentials.class);
+ }
+
+ static Text getUniqueAlias(Token<?> token) {
+ return new Text(String.format("%s_%s_%d", token.getKind().toString(),
+ token.getService().toString(), System.currentTimeMillis()));
}
/**
@@ -80,7 +93,7 @@ public class CredentialsProviderFactory {
* @throws Exception
*/
public CredentialsProvider createCredentialsProvider(String type) throws Exception {
- Class<CredentialsProvider> providerClass = providerCache.get(type);
+ Class<? extends CredentialsProvider> providerClass = providerCache.get(type);
if(providerClass == null){
return null;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
index 9804c7b..274db78 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java
@@ -20,7 +20,6 @@ package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
@@ -63,8 +62,8 @@ public class HCatCredentialHelper {
.getLoginUser().getShortUserName());
Token<DelegationTokenIdentifier> hcatToken = new Token<DelegationTokenIdentifier>();
hcatToken.decodeFromUrlString(tokenStrForm);
- credentials.addToken(new Text("HCat Token"), hcatToken);
- XLog.getLog(getClass()).debug("Added the HCat token to launcher configuration");
+ credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hcatToken), hcatToken);
+ XLog.getLog(getClass()).debug("Added the HCat token to launcher's credentials");
}
catch (Exception ex) {
XLog.getLog(getClass()).debug("set Exception {0}", ex.getMessage());
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
index 52abbf1..47b2407 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java
@@ -70,7 +70,7 @@ public class HCatCredentials implements CredentialsProvider {
hcch.set(credentials, config, principal, server);
}
catch (Exception e) {
- XLog.getLog(getClass()).warn("Exception in addtoJobConf", e);
+ XLog.getLog(getClass()).warn("Exception in updateCredentials", e);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
new file mode 100644
index 0000000..c693399
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.util.XLog;
+
+
+public class HDFSCredentials implements CredentialsProvider {
+ protected XLog LOG = XLog.getLog(getClass());
+
+ /**
+ * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided.
+ * This is also important to ensure that log aggregation works correctly from the NM
+ *
+ * @param credentials the credentials object which is updated
+ * @param config launcher AM configuration
+ * @param props properties for getting credential token or certificate
+ * @param context workflow context
+ * @throws Exception thrown if failed
+ */
+ @Override
+ public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+ ActionExecutor.Context context) throws Exception {
+ try (FileSystem fileSystem = context.getAppFileSystem()) {
+ final String renewer = new HadoopTokenHelper().getServerPrincipal(config);
+ LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer);
+ final Token hdfsDelegationToken = fileSystem.getDelegationToken(renewer);
+ if (hdfsDelegationToken == null) {
+ throw new CredentialException(ErrorCode.E0511, renewer);
+ }
+ LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", hdfsDelegationToken);
+ credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken);
+ } catch (Exception e) {
+ LOG.debug("exception in updateCredentials", e);
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
new file mode 100644
index 0000000..1018d9d
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.util.XLog;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class HadoopTokenHelper {
+ /** The Kerberos principal for the resource manager.*/
+ protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
+ protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
+ private XLog LOG = XLog.getLog(getClass());
+
+ private String getServicePrincipal(final Configuration configuration) {
+ return configuration.get(RM_PRINCIPAL);
+ }
+
+ String getServerPrincipal(final Configuration configuration) throws IOException {
+ return getServerPrincipal(configuration, getServicePrincipal(configuration));
+ }
+
+ /**
+ * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer.
+ *
+ * @param configuration the {@link Configuration} containing the YARN RM address
+ * @param servicePrincipal the configured service principal
+ * @return the server principal originating from the host name and the service principal
+ * @throws IOException when something goes wrong finding out the local address inside
+ * {@link SecurityUtil#getServerPrincipal(String, String)}
+ */
+ private String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException {
+ Preconditions.checkNotNull(configuration, "configuration has to be filled");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)),
+ String.format("configuration entry %s has to be filled", HADOOP_YARN_RM));
+
+ String serverPrincipal;
+ final String target = configuration.get(HADOOP_YARN_RM);
+
+ try {
+ final String addr = NetUtils.createSocketAddr(target).getHostName();
+ serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr);
+ LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target);
+ } catch (final IllegalArgumentException iae) {
+ LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae);
+
+ serverPrincipal = servicePrincipal.split("[/@]")[0];
+ LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal);
+ }
+
+ return serverPrincipal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
index 4add5f1..bc87f29 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java
@@ -79,6 +79,7 @@ public class HbaseCredentials implements CredentialsProvider {
User u = User.create(ugi);
// A direct doAs is required here vs. User#obtainAuthTokenForJob(...)
// See OOZIE-2419 for more
+ XLog.getLog(getClass()).debug("Getting Hbase token for user {0}", user);
Token<AuthenticationTokenIdentifier> token = u.runAs(
new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
public Token<AuthenticationTokenIdentifier> run() throws Exception {
@@ -90,7 +91,8 @@ public class HbaseCredentials implements CredentialsProvider {
}
}
);
- credentials.addToken(token.getService(), token);
+ XLog.getLog(getClass()).debug("Got token, adding it to credentials.");
+ credentials.addToken(CredentialsProviderFactory.getUniqueAlias(token), token);
}
private void addPropsConf(CredentialsProperties props, Configuration destConf) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
index 0b495f7..d34f560 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java
@@ -23,7 +23,6 @@ import java.sql.DriverManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hive.jdbc.HiveConnection;
@@ -32,9 +31,9 @@ import org.apache.oozie.action.ActionExecutor.Context;
import org.apache.oozie.util.XLog;
/**
- * Credentials implementation to store in jobConf, Hive Server 2 specific properties
+ * Credentials implementation, Hive Server 2 specific properties
* User specifies these credential properties along with the action configuration
- * The jobConf is used further to pass credentials to the tasks while running
+ * The credentials is used further to pass credentials to the tasks while running
* Oozie server should be configured to use this class by including it via property 'oozie.credentials.credentialclasses'
* User can extend the parent class to implement own class as well
* for handling custom token-based credentials and add to the above server property
@@ -60,7 +59,7 @@ public class Hive2Credentials implements CredentialsProvider {
String principal = props.getProperties().get(HIVE2_SERVER_PRINCIPAL);
if (principal == null || principal.isEmpty()) {
throw new CredentialException(ErrorCode.E0510,
- HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credential");
+ HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credentials");
}
url = url + ";principal=" + principal;
Connection con = null;
@@ -79,12 +78,13 @@ public class Hive2Credentials implements CredentialsProvider {
Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
hive2Token.decodeFromUrlString(tokenStr);
- credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token);
- XLog.getLog(getClass()).debug("Added the Hive Server 2 token in job conf");
+ credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hive2Token), hive2Token);
+ XLog.getLog(getClass()).debug("Added the Hive Server 2 token to launcher's credential");
}
catch (Exception e) {
- XLog.getLog(getClass()).warn("Exception in addtoJobConf", e);
+ XLog.getLog(getClass()).warn("Exception in obtaining Hive2 token", e);
throw e;
}
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
new file mode 100644
index 0000000..d9099c5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.XLog;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+
+public class JHSCredentials implements CredentialsProvider {
+ protected XLog LOG = XLog.getLog(getClass());
+
+
+ /**
+ * Add an MR_DELEGATION_TOKEN to the {@link Credentials} provided.
+ * @param credentials the credentials object which is updated
+ * @param config launcher AM configuration
+ * @param props properties for getting credential token or certificate
+ * @param context workflow context
+ * @throws Exception thrown if failed
+ */
+ @Override
+ public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+ ActionExecutor.Context context) throws Exception {
+ try {
+ LOG.debug("Instantiating JHS Proxy");
+ MRClientProtocol hsProxy = instantiateHistoryProxy(config, context);
+ Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress());
+ LOG.debug("Getting delegation token for {0}", hsService.toString());
+ Token<?> jhsToken = getDelegationTokenFromJHS(hsProxy, new HadoopTokenHelper().getServerPrincipal(config));
+ LOG.debug("Acquired token {0}", jhsToken);
+ credentials.addToken(hsService, jhsToken);
+ } catch (IOException | InterruptedException ex) {
+ LOG.debug("exception in updateCredentials", ex);
+ throw new CredentialException(ErrorCode.E0512, ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Get a Delegation token from the JHS.
+ * Copied over from YARNRunner in Hadoop.
+ * @param hsProxy protcol used to get the token
+ * @return The RM_DELEGATION_TOKEN that can be used to talk to JHS
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private Token<?> getDelegationTokenFromJHS(final MRClientProtocol hsProxy, final String renewer)
+ throws IOException, InterruptedException {
+ GetDelegationTokenRequest request = RecordFactoryProvider
+ .getRecordFactory(null).newRecordInstance(GetDelegationTokenRequest.class);
+ LOG.debug("Creating requsest to JHS using renewer [{0}]", renewer);
+ request.setRenewer(renewer);
+ org.apache.hadoop.yarn.api.records.Token mrDelegationToken = hsProxy.getDelegationToken(request)
+ .getDelegationToken();
+ LOG.debug("Got token to JHS : {0}. Converting token.", mrDelegationToken);
+ return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
+ }
+
+ /**
+ * Create an MRClientProtocol to the JHS
+ * Copied over from ClientCache in Hadoop.
+ * @return the protocol that can be used to get a token with
+ * @throws IOException
+ */
+ private MRClientProtocol instantiateHistoryProxy(final Configuration configuration, final ActionExecutor.Context context)
+ throws IOException {
+ final String serviceAddr = configuration.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+ if (StringUtils.isEmpty(serviceAddr)) {
+ return null;
+ }
+ LOG.debug("Connecting to JHS at: " + serviceAddr);
+ final YarnRPC rpc = YarnRPC.create(configuration);
+ LOG.debug("Connected to JHS at: " + serviceAddr);
+ UserGroupInformation currentUser = Services.get().get(UserGroupInformationService.class)
+ .getProxyUser(context.getWorkflow().getUser());
+ return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() {
+ return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), configuration);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 9d1afb5..be05603 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.TaskLog;
@@ -60,8 +59,6 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -530,9 +527,9 @@ public class JavaActionExecutor extends ActionExecutor {
return conf;
}
catch (Exception ex) {
- LOG.debug(
- "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive="
- + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString());
+ LOG.debug("Errors when add to DistributedCache. Path=" +
+ Objects.toString(uri, "<null>") + ", archive=" + archive + ", conf=" +
+ XmlUtils.prettyPrint(conf).toString());
throw convertException(ex);
}
}
@@ -1021,42 +1018,33 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- // Setting the credential properties in launcher conf
- Configuration credentialsConf = null;
-
+ Credentials credentials = new Credentials();
+ Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
+ yarnClient = createYarnClient(context, launcherConf);
Map<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
action, actionConf);
- Credentials credentials = null;
- if (credentialsProperties != null) {
- credentials = new Credentials();
- // Adding if action need to set more credential tokens
- credentialsConf = new Configuration(false);
- XConfiguration.copy(actionConf, credentialsConf);
- setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
-
- // insert conf to action conf from credentialsConf
- for (Entry<String, String> entry : credentialsConf) {
- if (actionConf.get(entry.getKey()) == null) {
- actionConf.set(entry.getKey(), entry.getValue());
- }
- }
+ if (UserGroupInformation.isSecurityEnabled()) {
+ addHadoopCredentialPropertiesToActionConf(credentialsProperties);
}
- Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
+ // Adding if action need to set more credential tokens
+ Configuration credentialsConf = new Configuration(false);
+ XConfiguration.copy(actionConf, credentialsConf);
+ setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
+ // copy back new entries from credentialsConf
+ for (Entry<String, String> entry : credentialsConf) {
+ if (actionConf.get(entry.getKey()) == null) {
+ actionConf.set(entry.getKey(), entry.getValue());
+ }
+ }
String consoleUrl;
- String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+ String launcherId = LauncherHelper.getRecoveryId(launcherConf, context.getActionDir(), context
.getRecoveryId());
boolean alreadyRunning = launcherId != null;
// if user-retry is on, always submit new launcher
boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
LOG.debug("Creating yarnClient for action {0}", action.getId());
- yarnClient = createYarnClient(context, launcherJobConf);
-
- if (UserGroupInformation.isSecurityEnabled()) {
- credentials = ensureCredentials(credentials);
- acquireHDFSDelegationToken(actionFs, credentialsConf, credentials);
- }
if (alreadyRunning && !isUserRetry) {
try {
@@ -1066,51 +1054,16 @@ public class JavaActionExecutor extends ActionExecutor {
} catch (RemoteException e) {
// caught when the application id does not exist
LOG.error("Got RemoteException from YARN", e);
- String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
+ String jobTracker = launcherConf.get(HADOOP_YARN_RM);
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
"unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
}
}
else {
- // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS
-// // setting up propagation of the delegation token.
-// Token<DelegationTokenIdentifier> mrdt = null;
-// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-// mrdt = jobClient.getDelegationToken(has
-// .getMRDelegationTokenRenewer(launcherJobConf));
-// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
-
- // insert credentials tokens to launcher job conf if needed
- if (credentialsConf != null) {
- for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) {
- Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
- LOG.debug("ADDING TOKEN: " + fauxAlias);
- credentials.addToken(fauxAlias, tk);
- }
- if (credentials.numberOfSecretKeys() > 0) {
- for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
- CredentialsProperties credProps = entry.getValue();
- if (credProps != null) {
- Text credName = new Text(credProps.getName());
- byte[] secKey = credentials.getSecretKey(credName);
- if (secKey != null) {
- LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
- credentials.addSecretKey(credName, secKey);
- }
- }
- }
- }
- }
- else {
- LOG.info("No need to inject credentials.");
- }
-
- String user = context.getWorkflow().getUser();
-
YarnClientApplication newApp = yarnClient.createApplication();
ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
ApplicationSubmissionContext appContext =
- createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(),
+ createAppSubmissionContext(appId, launcherConf, context, actionConf, action.getName(),
credentials, actionXml);
yarnClient.submitApplication(appContext);
@@ -1120,7 +1073,7 @@ public class JavaActionExecutor extends ActionExecutor {
consoleUrl = appReport.getTrackingUrl();
}
- String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
+ String jobTracker = launcherConf.get(HADOOP_YARN_RM);
context.setStartData(launcherId, jobTracker, consoleUrl);
}
catch (Exception ex) {
@@ -1146,58 +1099,19 @@ public class JavaActionExecutor extends ActionExecutor {
return context.getVar(OOZIE_ACTION_NAME);
}
- private Credentials ensureCredentials(final Credentials credentials) {
- if (credentials == null) {
- LOG.debug("No credentials present, creating a new one.");
- return new Credentials();
- }
-
- return credentials;
+ private void addHadoopCredentialPropertiesToActionConf(Map<String, CredentialsProperties> credentialsProperties) {
+ LOG.info("Adding default credentials for action: hdfs, yarn and jhs");
+ addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.HDFS);
+ addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.YARN);
+ addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.JHS);
}
- /**
- * In a secure environment, when both HDFS HA and log aggregation are turned on, {@link JavaActionExecutor} is not able to call
- * {@link YarnClient#submitApplication} since {@code HDFS_DELEGATION_TOKEN} is missing.
- *
- * @param actionFs the {@link FileSystem} to get the delegation token from
- * @param credentialsConf the {@link Configuration} to extract the YARN renewer
- * @param credentials the {@link Credentials} where the delegation token is stored
- * @throws IOException
- * @throws ActionExecutorException when security is enabled, but either {@code credentials} are empty, or
- * {@code serverPrincipal} is empty, or HDFS delegation token is not present within {@code actionFs}
- */
- private void acquireHDFSDelegationToken(final FileSystem actionFs,
- final Configuration credentialsConf,
- final Credentials credentials)
- throws IOException, ActionExecutorException {
- LOG.debug("Security is enabled, checking credentials to acquire HDFS delegation token.");
-
- final HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class);
- final String servicePrincipal = hadoopAccessorService.getServicePrincipal(credentialsConf);
- final String serverPrincipal = hadoopAccessorService.getServerPrincipal(
- credentialsConf,
- servicePrincipal);
- if (serverPrincipal == null) {
- final String errorTemplate = "No server principal present, won't get HDFS delegation token. [servicePrincipal={0}]";
- LOG.error(errorTemplate, servicePrincipal);
- throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, servicePrincipal);
- }
-
- LOG.debug("Server principal present, getting HDFS delegation token. [serverPrincipal={0}]", serverPrincipal);
- final Token hdfsDelegationToken = actionFs.getDelegationToken(serverPrincipal);
- if (hdfsDelegationToken == null) {
- final String errorTemplate = "No HDFS delegation token present, won't set credentials. [serverPrincipal={0}]";
- LOG.error(errorTemplate, serverPrincipal);
- throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, serverPrincipal);
- }
-
- LOG.debug("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]",
- hdfsDelegationToken);
- credentials.addToken(new Text(hdfsDelegationToken.getService().toString()), hdfsDelegationToken);
+ private void addHadoopCredentialProperties(Map<String, CredentialsProperties> credentialsProperties, String type) {
+ credentialsProperties.put(type, new CredentialsProperties(type, type));
}
private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf,
- String user, Context context, Configuration actionConf, String actionName,
+ Context context, Configuration actionConf, String actionName,
Credentials credentials, Element actionXml)
throws IOException, HadoopAccessorException, URISyntaxException {
@@ -1212,12 +1126,14 @@ public class JavaActionExecutor extends ActionExecutor {
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ final String user = context.getWorkflow().getUser();
// Set the resources to localize
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
MRApps.setupDistributedCache(launcherJobConf, localResources);
// Add the Launcher and Action configs as Resources
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ launcherJobConf.set(LauncherAM.OOZIE_SUBMITTER_USER, user);
LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
@@ -1398,37 +1314,38 @@ public class JavaActionExecutor extends ActionExecutor {
return envMap;
}
- protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
- WorkflowAction action, Configuration actionConf) throws Exception {
- HashMap<String, CredentialsProperties> credPropertiesMap = null;
- if (context != null && action != null) {
- if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
- XConfiguration wfJobConf = getWorkflowConf(context);
- if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
- !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
- credPropertiesMap = getActionCredentialsProperties(context, action);
- if (!credPropertiesMap.isEmpty()) {
- for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
- if (entry.getValue() != null) {
- CredentialsProperties prop = entry.getValue();
- LOG.debug("Credential Properties set for action : " + action.getId());
- for (Entry<String, String> propEntry : prop.getProperties().entrySet()) {
- String key = propEntry.getKey();
- String value = propEntry.getValue();
- actionConf.set(key, value);
- LOG.debug("property : '" + key + "', value : '" + value + "'");
- }
- }
- }
- } else {
- LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
+ Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context,
+ final WorkflowAction action,
+ final Configuration actionConf) throws Exception {
+ final Map<String, CredentialsProperties> credPropertiesMap = new HashMap<>();
+ if (context == null || action == null) {
+ LOG.warn("context or action is null");
+ return credPropertiesMap;
+ }
+ final XConfiguration wfJobConf = getWorkflowConf(context);
+ final boolean skipCredentials = actionConf.getBoolean(OOZIE_CREDENTIALS_SKIP,
+ wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)));
+ if (skipCredentials) {
+ LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
+ } else {
+ credPropertiesMap.putAll(getActionCredentialsProperties(context, action));
+ if (credPropertiesMap.isEmpty()) {
+ LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
+ return credPropertiesMap;
+ }
+ for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
+ if (entry.getValue() != null) {
+ final CredentialsProperties prop = entry.getValue();
+ LOG.debug("Credential Properties set for action : " + action.getId());
+ for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) {
+ final String key = propEntry.getKey();
+ final String value = propEntry.getValue();
+ actionConf.set(key, value);
+ LOG.debug("property : '" + key + "', value : '" + value + "'");
}
- } else {
- LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
}
}
}
-
return credPropertiesMap;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
new file mode 100644
index 0000000..061ca05
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XLog;
+
+public class YarnRMCredentials implements CredentialsProvider {
+ /**
+ * Add an RM_DELEGATION_TOKEN to the {@link Credentials} provided.
+ *
+ * @param credentials the credentials object which is updated
+ * @param config launcher AM configuration
+ * @param props properties for getting credential token or certificate
+ * @param context workflow context
+ * @throws Exception thrown if failed
+ */
+ @Override
+ public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props,
+ ActionExecutor.Context context) throws Exception {
+ Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(config);
+ if (rmDelegationTokenService == null) {
+ throw new CredentialException(ErrorCode.E0512, "Can't create RMDelegationTokenService");
+ }
+ try (YarnClient yarnClient = Services.get().get(HadoopAccessorService.class)
+ .createYarnClient(context.getWorkflow().getUser(), config)) {
+ org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
+ yarnClient.getRMDelegationToken(new Text(new HadoopTokenHelper().getServerPrincipal(config)));
+ if (rmDelegationToken == null) {
+ throw new CredentialException(ErrorCode.E0512, "Returned token is null");
+ }
+ Token<TokenIdentifier> rmToken = ConverterUtils.convertFromYarn(rmDelegationToken, rmDelegationTokenService);
+ credentials.addToken(rmDelegationTokenService, rmToken);
+ } catch (Exception e) {
+ XLog.getLog(getClass()).debug("Exception in updateCredentials", e);
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 187cee2..73300a6 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -20,23 +20,38 @@ package org.apache.oozie.service;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.ParamChecker;
@@ -55,6 +70,7 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Comparator;
@@ -84,16 +100,8 @@ public class HadoopAccessorService implements Service {
public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled";
public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file";
public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
- public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
-
- /** The Kerberos principal for the job tracker.*/
- protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
- /** The Kerberos principal for the resource manager.*/
- protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
- protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
- private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
private static final String DEFAULT_ACTIONNAME = "default";
private static Configuration cachedConf;
@@ -335,7 +343,7 @@ public class HadoopAccessorService implements Service {
return HadoopAccessorService.class;
}
- private UserGroupInformation getUGI(String user) throws IOException {
+ UserGroupInformation getUGI(String user) throws IOException {
return ugiService.getProxyUser(user);
}
@@ -650,71 +658,6 @@ public class HadoopAccessorService implements Service {
}
}
- public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
- if (UserGroupInformation.isSecurityEnabled()) { // secure cluster
- return getMRTokenRenewerInternal(jobConf);
- }
- else {
- return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
- }
- }
-
- // Package private for unit test purposes
- Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
- // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have
- // support for renewing/cancelling tokens
- final String servicePrincipal = getServicePrincipal(jobConf);
- Text renewer;
- if (servicePrincipal != null) { // secure cluster
- renewer = mrTokenRenewers.get(servicePrincipal);
- if (renewer == null) {
- renewer = new Text(getServerPrincipal(jobConf, servicePrincipal));
- mrTokenRenewers.put(servicePrincipal, renewer);
- }
- }
- else {
- renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer
- }
- return renewer;
- }
-
- public String getServicePrincipal(final Configuration configuration) {
- return configuration.get(RM_PRINCIPAL, configuration.get(JT_PRINCIPAL));
- }
-
- /**
- * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer.
- *
- * @param configuration the {@link Configuration} containing the YARN RM address
- * @param servicePrincipal the configured service principal
- * @return the server principal originating from the host name and the service principal
- * @throws IOException when something goes wrong finding out the local address inside
- * {@link SecurityUtil#getServerPrincipal(String, String)}
- */
- public String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException {
- Preconditions.checkNotNull(configuration, "configuration has to be filled");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)),
- String.format("configuration entry %s has to be filled", HADOOP_YARN_RM));
-
- String serverPrincipal;
- final String target = configuration.get(HADOOP_YARN_RM);
-
- try {
- final String addr = NetUtils.createSocketAddr(target).getHostName();
- serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr);
- LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target);
- }
- catch (final IllegalArgumentException iae) {
- LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae);
-
- serverPrincipal = servicePrincipal.split("[/@]")[0];
- LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal);
- }
-
- return serverPrincipal;
- }
-
public void addFileToClassPath(String user, final Path file, final Configuration conf)
throws IOException {
ParamChecker.notEmpty(user, "user");
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
new file mode 100644
index 0000000..17cdd9d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHadoopTokenHelper {
+
+ @Test
+ public void testGetMRDelegationTokenRenewer() throws Exception {
+ HadoopTokenHelper hadoopTokenHelper = new HadoopTokenHelper();
+ Configuration configuration = new Configuration(false);
+ configuration.set("yarn.resourcemanager.address", "localhost:8032");
+ configuration.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
+ assertEquals("yarn setup","rm/server.com@KDC.DOMAIN.COM",
+ hadoopTokenHelper.getServerPrincipal(configuration));
+
+ configuration = new Configuration(false);
+ configuration.set("yarn.resourcemanager.address", "rm-ha-uri");
+ configuration.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
+ assertEquals("yarn ha setup","rm",
+ hadoopTokenHelper.getServerPrincipal(configuration));
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 02e60c0..0d1dd97 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -1083,7 +1083,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
// Define 'abc' token type in oozie-site
ConfigurationService.set("oozie.credentials.credentialclasses", "abc=org.apache.oozie.action.hadoop.InsertTestToken");
ConfigurationService.setBoolean("oozie.credentials.skip", skipSite);
-
// Setting the credential properties in launcher conf
Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context,
action, actionConf);
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
index a7d6c18..de67365 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
@@ -48,7 +48,7 @@ public class TestShellMain extends ShellTestCase {
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.job.tracker", getJobTrackerUri());
jobConf.set("fs.default.name", getNameNodeUri());
-
+ jobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, System.currentTimeMillis());
jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME);
String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" };
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index cf77f18..98a41a1 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -180,7 +180,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
- System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+ jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
jobClient.submitJob(new JobConf(jobConf));
Set<ApplicationId> apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index 960c2f9..89ce185 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -228,37 +228,6 @@ public class TestHadoopAccessorService extends XFsTestCase {
}
}
- public void testGetMRDelegationTokenRenewer() throws Exception {
- HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- JobConf jobConf = new JobConf(false);
- assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf));
- jobConf.set("yarn.resourcemanager.address", "localhost:50300");
- jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
- assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
- jobConf = new JobConf(false);
- jobConf.set("mapreduce.jobtracker.address", "127.0.0.1:50300");
- jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
- assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
- jobConf = new JobConf(false);
- jobConf.set("yarn.resourcemanager.address", "localhost:8032");
- jobConf.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
- assertEquals(new Text("rm/server.com@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
-
- // Try the above with logical URIs (i.e. for HA)
- jobConf = new JobConf(false);
- jobConf.set("mapred.job.tracker", "jt-ha-uri");
- jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
- assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
- jobConf = new JobConf(false);
- jobConf.set("mapreduce.jobtracker.address", "jt-ha-uri");
- jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
- assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
- jobConf = new JobConf(false);
- jobConf.set("yarn.resourcemanager.address", "rm-ha-uri");
- jobConf.set("yarn.resourcemanager.principal", "rm/server.com@KDC.DOMAIN.COM");
- assertEquals(new Text("rm/server.com@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
- }
-
public void testCheckSupportedFilesystem() throws Exception {
Configuration hConf = Services.get().getConf();
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82621ca..bacd686 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.0.0 release (trunk - unreleased)
+OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)
OOZIE-2687 Create XML schema for launcher configurations (asasvari)
OOZIE-3041 TestWorkflowActionRetryInfoXCommand fails in oozie core module (andras.piros via gezapeti)
OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
index b4cbb4b..bfb3d76 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
@@ -22,11 +22,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
public class AMRMClientAsyncFactory {
- public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) {
+ public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs, AMRMCallBackHandler callBackHandler) {
AMRMClient<?> amRmClient = AMRMClient.createAMRMClient();
- AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler);
-
return amRmClientAsync;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
index 874d371..fcb0a92 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -40,85 +40,59 @@ import com.google.common.base.Preconditions;
public class HdfsOperations {
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private final SequenceFileWriterFactory seqFileWriterFactory;
- private final UserGroupInformation ugi;
- public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) {
+ public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory) {
this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null");
- this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
}
/**
* Creates a Sequence file which contains the output from an action and uploads it to HDFS.
*/
public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir,
- final Map<String, String> actionData) throws IOException, InterruptedException {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
- // upload into sequence file
- System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
+ final Map<String, String> actionData) throws IOException, InterruptedException {
+ Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
+ // upload into sequence file
+ System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
- try (SequenceFile.Writer wr =
- seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
+ try (SequenceFile.Writer wr =
+ seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
- if (wr != null) {
- for (Entry<String, String> entry : actionData.entrySet()) {
- wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
- }
- } else {
- throw new IOException("SequenceFile.Writer is null for " + finalPath);
- }
+ if (wr != null) {
+ for (Entry<String, String> entry : actionData.entrySet()) {
+ wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
}
-
- return null;
+ } else {
+ throw new IOException("SequenceFile.Writer is null for " + finalPath);
}
- });
+ }
}
public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws Exception {
- FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
- return fs.exists(path);
- }
- });
+ FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+ return fs.exists(path);
}
public void writeStringToFile(final Path path, final Configuration conf, final String contents)
throws IOException, InterruptedException {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- try (FileSystem fs = FileSystem.get(path.toUri(), conf);
- java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
- writer.write(contents);
- }
-
- return null;
- }
- });
+ try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+ java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
+ writer.write(contents);
+ }
}
public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
- return ugi.doAs(new PrivilegedExceptionAction<String>() {
- @Override
- public String run() throws Exception {
- StringBuilder sb = new StringBuilder();
-
- try (FileSystem fs = FileSystem.get(path.toUri(), conf);
- InputStream is = fs.open(path);
- BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
+ StringBuilder sb = new StringBuilder();
- String contents;
- while ((contents = reader.readLine()) != null) {
- sb.append(contents);
- }
- }
+ try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+ InputStream is = fs.open(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
- return sb.toString();
+ String contents;
+ while ((contents = reader.readLine()) != null) {
+ sb.append(contents);
}
- });
+ }
+
+ return sb.toString();
}
}