You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/14 02:10:31 UTC
[2/2] tez git commit: Revert "TEZ-1529. ATS and TezClient integration
in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)"
Revert "TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)"
This reverts commit f0b9d7ec7472d6c4b932c249f6106c5b6ca85b88.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2467d608
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2467d608
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2467d608
Branch: refs/heads/master
Commit: 2467d60896e5c99536fe4e64b58c6f57dc1587ec
Parents: 2df32bf
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 14 08:10:12 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 14 08:10:12 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 5 +-
.../tez/dag/api/client/DAGClientImpl.java | 5 +
.../dag/api/client/DAGClientTimelineImpl.java | 187 ++-----------------
.../tez/dag/api/client/TestATSHttpClient.java | 2 +-
4 files changed, 29 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 92c4df2..c5acc21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,7 +24,6 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
TEZ-2445. Disable the object cleanup in local mode in LogicalIOProcessorRuntimeTask.
TEZ-2057. tez-dag/pom.xml contains versions for dependencies.
- TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2435. Add public key to KEYS
TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
@@ -195,9 +194,12 @@ Release 0.6.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+<<<<<<< HEAD
TEZ-2057. tez-dag/pom.xml contains versions for dependencies.
TEZ-2282. Delimit reused yarn container logs (stderr, stdout, syslog) with task attempt start/stop events
TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
+=======
+>>>>>>> parent of f0b9d7e... TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)
TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version.
TEZ-2237. Valid events should be sent out when an Output is not started.
TEZ-1988. Tez UI: does not work when using file:// in a browser
@@ -374,7 +376,6 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
- TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2369. Add a few unit tests for RootInputInitializerManager.
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
Invalid event: T_ATTEMPT_KILLED at KILLED.
http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index b0ad51c..de6ede6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -93,6 +93,11 @@ public class DAGClientImpl extends DAGClient {
conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+ if (UserGroupInformation.isSecurityEnabled()){
+ //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
+ isATSEnabled = false;
+ }
+
realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
statusPollInterval = conf.getLong(
TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index d0b11d6..cc000df 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -19,20 +19,13 @@
package org.apache.tez.dag.api.client;
import javax.annotation.Nullable;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
-import java.net.URI;
import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-import java.security.PrivilegedExceptionAction;
+import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -52,14 +45,6 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -96,13 +81,8 @@ public class DAGClientTimelineImpl extends DAGClient {
private static Client httpClient = null;
private final ApplicationId appId;
private final String dagId;
+ private final TezConfiguration conf;
private final FrameworkClient frameworkClient;
- private final UserGroupInformation authUgi;
- private final String doAsUser;
- private final DelegationTokenAuthenticator authenticator;
- private final DelegationTokenAuthenticatedURL.Token token;
- private final ConnectionConfigurator connConfigurator;
- private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds
private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
@@ -111,9 +91,10 @@ public class DAGClientTimelineImpl extends DAGClient {
public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
FrameworkClient frameworkClient)
- throws TezException, IOException {
+ throws TezException {
this.appId = appId;
this.dagId = dagId;
+ this.conf = conf;
this.frameworkClient = frameworkClient;
String scheme;
@@ -130,29 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient {
}
baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
-
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- UserGroupInformation realUgi = ugi.getRealUser();
- if (realUgi != null) {
- authUgi = realUgi;
- doAsUser = ugi.getShortUserName();
- } else {
- authUgi = ugi;
- doAsUser = null;
- }
-
-
- if (UserGroupInformation.isSecurityEnabled()) {
- authenticator = new KerberosDelegationTokenAuthenticator();
- } else {
- authenticator = new PseudoDelegationTokenAuthenticator();
- }
-
- connConfigurator = newConnConfigurator(conf);
- authenticator.setConnectionConfigurator(connConfigurator);
- token = new DelegationTokenAuthenticatedURL.Token();
}
+
@Override
public String getExecutionContext() {
return "Executing on YARN cluster with App id " + appId;
@@ -164,13 +125,7 @@ public class DAGClientTimelineImpl extends DAGClient {
try {
appReport = frameworkClient.getApplicationReport(appId);
} catch (YarnException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("error getting application report", e);
- }
} catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("error getting application report", e);
- }
}
return appReport;
}
@@ -457,13 +412,8 @@ public class DAGClientTimelineImpl extends DAGClient {
.type(MediaType.APPLICATION_JSON_TYPE)
.get(ClientResponse.class);
- final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
- if (clientResponseStatus != ClientResponse.Status.OK) {
- if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) {
- httpClient = null;
- }
- throw new TezException("Failed to get response from YARN Timeline: url: " + url +
- " error: " + clientResponseStatus);
+ if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+ throw new TezException("Failed to get response from YARN Timeline: url: " + url);
}
return response.getEntity(JSONObject.class);
@@ -473,8 +423,6 @@ public class DAGClientTimelineImpl extends DAGClient {
throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
} catch (IllegalArgumentException e) {
throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
- } catch (IOException e) {
- throw new TezException("Error getting http client connection", e);
}
}
@@ -512,119 +460,15 @@ public class DAGClientTimelineImpl extends DAGClient {
}
}
- protected Client getHttpClient() throws IOException, TezException {
+ protected Client getHttpClient() {
if (httpClient == null) {
- if (UserGroupInformation.isSecurityEnabled()) {
- final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- try {
- final Token<?> delegationToken = getDelegationToken(currentUser.getUserName());
- currentUser.addToken(delegationToken);
- } catch (UndeclaredThrowableException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("exception getting httpclient token", e);
- }
- }
- }
-
- ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
- HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory();
- httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
+ ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+ httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
}
return httpClient;
}
- private Token<?> getDelegationToken(final String renewer) throws
- IOException, TezException {
- authUgi.checkTGTAndReloginFromKeytab();
- try {
- return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
- @Override
- public Token<?> run() throws IOException, AuthenticationException {
- try {
- URI resURI = URI.create(baseUri);
- DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
- return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser);
- } catch (IllegalArgumentException e) {
- throw new IOException("invalid url " + baseUri, e);
- }
- }
- });
- } catch (InterruptedException e) {
- throw new TezException(e);
- }
- }
-
- private class TimelineURLConnectionFactory implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
- try {
- return new DelegationTokenAuthenticatedURL(
- authenticator, connConfigurator).openConnection(url, token,
- doAsUser);
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (AuthenticationException ae) {
- throw new IOException(ae);
- }
- }
-
- }
-
- private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
- try {
- return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot load customized ssl related configuration. " +
- "Fallback to system-generic settings.", e);
- }
- return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
- }
- }
-
- private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
- new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
- return conn;
- }
- };
-
- private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
- throws IOException, GeneralSecurityException {
- final SSLFactory factory;
- final SSLSocketFactory sf;
- final HostnameVerifier hv;
-
- factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- factory.init();
- sf = factory.createSSLSocketFactory();
- hv = factory.getHostnameVerifier();
-
- return new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection c = (HttpsURLConnection) conn;
- c.setSSLSocketFactory(sf);
- c.setHostnameVerifier(hv);
- }
- setTimeouts(conn, timeout);
- return conn;
- }
- };
- }
-
- private static void setTimeouts(URLConnection connection, int socketTimeout) {
- connection.setConnectTimeout(socketTimeout);
- connection.setReadTimeout(socketTimeout);
- }
-
private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
@@ -654,6 +498,15 @@ public class DAGClientTimelineImpl extends DAGClient {
}});
+ static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+ URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+ return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+ }
+ }
+
@Override
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
long timeout) throws IOException, TezException {
http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index aafc28f..a72b799 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -53,7 +53,7 @@ public class TestATSHttpClient {
}
@Test(timeout = 5000)
- public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException {
+ public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
ApplicationId mockAppId = mock(ApplicationId.class);
DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
new TezConfiguration(), null);