You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/05/11 20:45:22 UTC
tez git commit: TEZ-1529. ATS and TezClient integration in secure
kerberos enabled cluster. (Prakash Ramachandran via hitesh)
Repository: tez
Updated Branches:
refs/heads/master e00b44a80 -> f0b9d7ec7
TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0b9d7ec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0b9d7ec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0b9d7ec
Branch: refs/heads/master
Commit: f0b9d7ec7472d6c4b932c249f6106c5b6ca85b88
Parents: e00b44a
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 11 11:44:53 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 11 11:44:53 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../tez/dag/api/client/DAGClientImpl.java | 5 -
.../dag/api/client/DAGClientTimelineImpl.java | 187 +++++++++++++++++--
.../tez/dag/api/client/TestATSHttpClient.java | 2 +-
4 files changed, 171 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4c2baf..265281f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2435. Add public key to KEYS
TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
@@ -190,6 +191,7 @@ Release 0.6.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version.
TEZ-2237. Valid events should be sent out when an Output is not started.
TEZ-1988. Tez UI: does not work when using file:// in a browser
@@ -366,6 +368,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2369. Add a few unit tests for RootInputInitializerManager.
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
Invalid event: T_ATTEMPT_KILLED at KILLED.
http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index de6ede6..b0ad51c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -93,11 +93,6 @@ public class DAGClientImpl extends DAGClient {
conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
- if (UserGroupInformation.isSecurityEnabled()){
- //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
- isATSEnabled = false;
- }
-
realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
statusPollInterval = conf.getLong(
TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index cc000df..d0b11d6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -19,13 +19,20 @@
package org.apache.tez.dag.api.client;
import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
+import java.net.URI;
import java.net.URL;
-import java.net.URLEncoder;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -45,6 +52,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -81,8 +96,13 @@ public class DAGClientTimelineImpl extends DAGClient {
private static Client httpClient = null;
private final ApplicationId appId;
private final String dagId;
- private final TezConfiguration conf;
private final FrameworkClient frameworkClient;
+ private final UserGroupInformation authUgi;
+ private final String doAsUser;
+ private final DelegationTokenAuthenticator authenticator;
+ private final DelegationTokenAuthenticatedURL.Token token;
+ private final ConnectionConfigurator connConfigurator;
+ private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds
private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
@@ -91,10 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient {
public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
FrameworkClient frameworkClient)
- throws TezException {
+ throws TezException, IOException {
this.appId = appId;
this.dagId = dagId;
- this.conf = conf;
this.frameworkClient = frameworkClient;
String scheme;
@@ -111,8 +130,28 @@ public class DAGClientTimelineImpl extends DAGClient {
}
baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
- }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUgi = ugi.getRealUser();
+ if (realUgi != null) {
+ authUgi = realUgi;
+ doAsUser = ugi.getShortUserName();
+ } else {
+ authUgi = ugi;
+ doAsUser = null;
+ }
+
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ authenticator = new KerberosDelegationTokenAuthenticator();
+ } else {
+ authenticator = new PseudoDelegationTokenAuthenticator();
+ }
+
+ connConfigurator = newConnConfigurator(conf);
+ authenticator.setConnectionConfigurator(connConfigurator);
+ token = new DelegationTokenAuthenticatedURL.Token();
+ }
@Override
public String getExecutionContext() {
@@ -125,7 +164,13 @@ public class DAGClientTimelineImpl extends DAGClient {
try {
appReport = frameworkClient.getApplicationReport(appId);
} catch (YarnException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("error getting application report", e);
+ }
} catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("error getting application report", e);
+ }
}
return appReport;
}
@@ -412,8 +457,13 @@ public class DAGClientTimelineImpl extends DAGClient {
.type(MediaType.APPLICATION_JSON_TYPE)
.get(ClientResponse.class);
- if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
- throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+ final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
+ if (clientResponseStatus != ClientResponse.Status.OK) {
+ if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) {
+ httpClient = null;
+ }
+ throw new TezException("Failed to get response from YARN Timeline: url: " + url +
+ " error: " + clientResponseStatus);
}
return response.getEntity(JSONObject.class);
@@ -423,6 +473,8 @@ public class DAGClientTimelineImpl extends DAGClient {
throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
} catch (IllegalArgumentException e) {
throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
+ } catch (IOException e) {
+ throw new TezException("Error getting http client connection", e);
}
}
@@ -460,15 +512,119 @@ public class DAGClientTimelineImpl extends DAGClient {
}
}
- protected Client getHttpClient() {
+ protected Client getHttpClient() throws IOException, TezException {
if (httpClient == null) {
- ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
- HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
- httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ try {
+ final Token<?> delegationToken = getDelegationToken(currentUser.getUserName());
+ currentUser.addToken(delegationToken);
+ } catch (UndeclaredThrowableException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("exception getting httpclient token", e);
+ }
+ }
+ }
+
+ ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory();
+ httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
}
return httpClient;
}
+ private Token<?> getDelegationToken(final String renewer) throws
+ IOException, TezException {
+ authUgi.checkTGTAndReloginFromKeytab();
+ try {
+ return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+ @Override
+ public Token<?> run() throws IOException, AuthenticationException {
+ try {
+ URI resURI = URI.create(baseUri);
+ DelegationTokenAuthenticatedURL authUrl =
+ new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+ return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("invalid url " + baseUri, e);
+ }
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+ }
+
+ private class TimelineURLConnectionFactory implements HttpURLConnectionFactory {
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+ try {
+ return new DelegationTokenAuthenticatedURL(
+ authenticator, connConfigurator).openConnection(url, token,
+ doAsUser);
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (AuthenticationException ae) {
+ throw new IOException(ae);
+ }
+ }
+
+ }
+
+ private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
+ try {
+ return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot load customized ssl related configuration. " +
+ "Fallback to system-generic settings.", e);
+ }
+ return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+ }
+ }
+
+ private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
+ new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+ return conn;
+ }
+ };
+
+ private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
+ throws IOException, GeneralSecurityException {
+ final SSLFactory factory;
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ factory.init();
+ sf = factory.createSSLSocketFactory();
+ hv = factory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection c = (HttpsURLConnection) conn;
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ }
+ setTimeouts(conn, timeout);
+ return conn;
+ }
+ };
+ }
+
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
+ connection.setConnectTimeout(socketTimeout);
+ connection.setReadTimeout(socketTimeout);
+ }
+
private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
@@ -498,15 +654,6 @@ public class DAGClientTimelineImpl extends DAGClient {
}});
- static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
- @Override
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
- String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
- URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
- return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
- }
- }
-
@Override
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
long timeout) throws IOException, TezException {
http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index a72b799..aafc28f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -53,7 +53,7 @@ public class TestATSHttpClient {
}
@Test(timeout = 5000)
- public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
+ public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException {
ApplicationId mockAppId = mock(ApplicationId.class);
DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
new TezConfiguration(), null);