You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/06/02 01:36:42 UTC
[04/50] [abbrv] tez git commit: TEZ-1529. ATS and TezClient
integration in secure kerberos enabled cluster. (pramachandran)
TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (pramachandran)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8710df0d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8710df0d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8710df0d
Branch: refs/heads/TEZ-2003
Commit: 8710df0d1264a453218220ed48e5d2b5d2923da1
Parents: dac59a2
Author: Prakash Ramachandran <pr...@hortonworks.com>
Authored: Wed May 27 17:59:17 2015 +0530
Committer: Prakash Ramachandran <pr...@hortonworks.com>
Committed: Wed May 27 17:59:17 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ReflectionUtils.java | 19 +
.../tez/dag/api/client/DAGClientImpl.java | 9 +-
.../dag/api/client/DAGClientTimelineImpl.java | 55 ++-
.../dag/api/client/TimelineReaderFactory.java | 387 +++++++++++++++++++
.../tez/dag/api/client/TestATSHttpClient.java | 6 +-
.../api/client/TestTimelineReaderFactory.java | 91 +++++
7 files changed, 529 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 484f78d..513285f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2481. Tez UI: graphical view does not render properly on IE11
TEZ-2474. The old taskNum is logged incorrectly when parallelism is changed
TEZ-2460. Temporary solution for issue due to YARN-2560
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
index 0fc529b..f1eb0ae 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -100,6 +100,25 @@ public class ReflectionUtils {
}
@Private
+ @SuppressWarnings("unchecked")
+ public static <T> T invokeMethod(Object target, Method method, Object... args) {
+ try {
+ return (T) method.invoke(target, args);
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Private
+ public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) {
+ try {
+ return targetClazz.getMethod(methodName, parameterTypes);
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Private
public static synchronized void addResourcesToClasspath(List<URL> urls) {
ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
.currentThread().getContextClassLoader());
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index baacdb9..66fc986 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -33,7 +33,6 @@ import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -95,10 +94,7 @@ public class DAGClientImpl extends DAGClient {
conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
- if (UserGroupInformation.isSecurityEnabled()){
- //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
- isATSEnabled = false;
- }
+ isATSEnabled = DAGClientTimelineImpl.isSupported();
realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
statusPollInterval = conf.getLong(
@@ -583,7 +579,8 @@ public class DAGClientImpl extends DAGClient {
private void switchToTimelineClient() throws IOException, TezException {
realClient.close();
- realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient);
+ realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient,
+ (int) (2 * PRINT_STATUS_INTERVAL_MILLIS));
if (LOG.isDebugEnabled()) {
LOG.debug("dag completed switching to DAGClientTimelineImpl");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index cc000df..9a0949b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -23,9 +23,6 @@ import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -40,16 +37,10 @@ import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,10 +69,10 @@ public class DAGClientTimelineImpl extends DAGClient {
private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
private static final String HTTPS_SCHEME = "https://";
private static final String HTTP_SCHEME = "http://";
- private static Client httpClient = null;
+ private Client httpClient = null;
+ private final TimelineReaderFactory.TimelineReaderStrategy timelineReaderStrategy;
private final ApplicationId appId;
private final String dagId;
- private final TezConfiguration conf;
private final FrameworkClient frameworkClient;
private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
@@ -90,16 +81,21 @@ public class DAGClientTimelineImpl extends DAGClient {
protected String baseUri;
public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
- FrameworkClient frameworkClient)
+ FrameworkClient frameworkClient, int connTimeout)
throws TezException {
+
+ if (!TimelineReaderFactory.isTimelineClientSupported()) {
+ throw new TezException("Reading from secure timeline is supported only for hadoop 2.6 and above.");
+ }
+
this.appId = appId;
this.dagId = dagId;
- this.conf = conf;
this.frameworkClient = frameworkClient;
String scheme;
String webAppAddress;
- if (webappHttpsOnly(conf)) {
+ boolean useHttps = webappHttpsOnly(conf);
+ if (useHttps) {
scheme = HTTPS_SCHEME;
webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
} else {
@@ -111,8 +107,14 @@ public class DAGClientTimelineImpl extends DAGClient {
}
baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
+
+ timelineReaderStrategy =
+ TimelineReaderFactory.getTimelineReaderStrategy(conf, useHttps, connTimeout);
}
+ public static boolean isSupported() {
+ return TimelineReaderFactory.isTimelineClientSupported();
+ }
@Override
public String getExecutionContext() {
@@ -407,13 +409,15 @@ public class DAGClientTimelineImpl extends DAGClient {
@VisibleForTesting
protected JSONObject getJsonRootEntity(String url) throws TezException {
try {
- WebResource wr = getHttpClient().resource(url);
+ WebResource wr = getCachedHttpClient().resource(url);
ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
.type(MediaType.APPLICATION_JSON_TYPE)
.get(ClientResponse.class);
- if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
- throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+ final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
+ if (clientResponseStatus != ClientResponse.Status.OK) {
+ throw new TezException("Failed to get response from YARN Timeline:" +
+ " errorCode:" + clientResponseStatus + ", url:" + url);
}
return response.getEntity(JSONObject.class);
@@ -423,6 +427,8 @@ public class DAGClientTimelineImpl extends DAGClient {
throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
} catch (IllegalArgumentException e) {
throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
+ } catch (IOException e) {
+ throw new TezException("Error failed to get http client", e);
}
}
@@ -460,11 +466,9 @@ public class DAGClientTimelineImpl extends DAGClient {
}
}
- protected Client getHttpClient() {
+ protected Client getCachedHttpClient() throws IOException {
if (httpClient == null) {
- ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
- HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
- httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+ httpClient = timelineReaderStrategy.getHttpClient();
}
return httpClient;
}
@@ -498,15 +502,6 @@ public class DAGClientTimelineImpl extends DAGClient {
}});
- static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
- @Override
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
- String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
- URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
- return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
- }
- }
-
@Override
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
long timeout) throws IOException, TezException {
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
new file mode 100644
index 0000000..f544198
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.security.GeneralSecurityException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * TimelineReaderFactory getTimelineReaderStrategy returns a Strategy class, which is used to
+ * create a httpclient, configured for the appropriate runtime.
+ *
+ * on hadoop 2.6+ the factory returns TimelineReaderTokenAuthenticatedStrategy, which supports
+ * kerberos based auth (secure cluster) or psuedo auth (un-secure cluster).
+ *
+ * on hadoop 2.4 where the token delegation auth is not supported, TimelineReaderPseudoAuthenticatedStrategy
+ * is used which supports only unsecure timeline.
+ *
+ */
+@InterfaceAudience.Private
+public class TimelineReaderFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimelineReaderFactory.class);
+
+ private static final String KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+ "org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator";
+ private static final String PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+ "org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator";
+ private static final String DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME =
+ "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL";
+ private static final String DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME =
+ "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator";
+ private static final String DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME =
+ "org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL$Token";
+
+ private static Class<?> delegationTokenAuthenticatorClazz = null;
+ private static Method delegationTokenAuthenticateURLOpenConnectionMethod = null;
+
+ public static TimelineReaderStrategy getTimelineReaderStrategy(Configuration conf,
+ boolean useHttps,
+ int connTimeout) throws TezException {
+
+ TimelineReaderStrategy timelineReaderStrategy;
+
+ if (!isTimelineClientSupported()) {
+ throw new TezException("Reading from timeline is not supported." +
+ " token delegation support: " + tokenDelegationSupported() +
+ ", is secure timeline: " + UserGroupInformation.isSecurityEnabled());
+ }
+
+ timelineReaderStrategy = getTimelineReaderStrategy(tokenDelegationSupported(), conf, useHttps,
+ connTimeout);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using " + timelineReaderStrategy.getClass().getName() + " to read timeline data");
+ }
+
+ return timelineReaderStrategy;
+ }
+
+ private static TimelineReaderStrategy getTimelineReaderStrategy(boolean isTokenDelegationSupported,
+ Configuration conf,
+ boolean useHttps,
+ int connTimeout) {
+ TimelineReaderStrategy timelineReaderStrategy;
+
+ if (isTokenDelegationSupported) {
+ timelineReaderStrategy =
+ new TimelineReaderTokenAuthenticatedStrategy(conf, useHttps, connTimeout);
+ } else {
+ timelineReaderStrategy =
+ new TimelineReaderPseudoAuthenticatedStrategy(conf, useHttps, connTimeout);
+ }
+
+ return timelineReaderStrategy;
+ }
+
+ /**
+ * Check if timeline client can be supported.
+ *
+ * @return boolean value indicating if timeline client to read data is supported.
+ */
+ public static boolean isTimelineClientSupported() {
+ // support to read data from timeline is based on the version of hadoop.
+ // reads are supported for non-secure cluster from hadoop 2.4 and up.
+ // reads are supported for secure cluster only from hadoop 2.6. check the presence of the classes
+ // required upfront if security is enabled.
+ return !UserGroupInformation.isSecurityEnabled() || tokenDelegationSupported();
+ }
+
+ public interface TimelineReaderStrategy {
+ Client getHttpClient() throws IOException;
+ }
+
+ /*
+ * auth strategy for secured and unsecured environment with delegation token (hadoop 2.6 and above)
+ */
+ private static class TimelineReaderTokenAuthenticatedStrategy implements TimelineReaderStrategy {
+ private final Configuration conf;
+ private final boolean useHttps;
+ private final int connTimeout;
+
+ public TimelineReaderTokenAuthenticatedStrategy(final Configuration conf,
+ final boolean useHttps,
+ final int connTimeout) {
+
+ this.conf = conf;
+ this.useHttps = useHttps;
+ this.connTimeout = connTimeout;
+ }
+
+ @Override
+ public Client getHttpClient() throws IOException {
+ Authenticator authenticator;
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUgi = ugi.getRealUser();
+ UserGroupInformation authUgi;
+ String doAsUser;
+ ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ ConnectionConfigurator connectionConfigurator = getNewConnectionConf(conf, useHttps,
+ connTimeout);
+
+ try {
+ authenticator = getTokenAuthenticator();
+ authenticator.setConnectionConfigurator(connectionConfigurator);
+ } catch (TezUncheckedException e) {
+ throw new IOException("Failed to get authenticator", e);
+ }
+
+ if (realUgi != null) {
+ authUgi = realUgi;
+ doAsUser = ugi.getShortUserName();
+ } else {
+ authUgi = ugi;
+ doAsUser = null;
+ }
+
+ HttpURLConnectionFactory connectionFactory =
+ new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
+ authUgi, doAsUser);
+ return new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
+ }
+
+ private static Authenticator getTokenAuthenticator() {
+ String authenticatorClazzName;
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ authenticatorClazzName = KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME;
+ } else {
+ authenticatorClazzName = PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME;
+ }
+
+ return ReflectionUtils.createClazzInstance(authenticatorClazzName);
+ }
+
+ private static class TokenAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+
+ private final Authenticator authenticator;
+ private final ConnectionConfigurator connConfigurator;
+ private final UserGroupInformation authUgi;
+ private final String doAsUser;
+ private final AuthenticatedURL.Token token;
+
+ public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator,
+ Authenticator authenticator,
+ UserGroupInformation authUgi,
+ String doAsUser) {
+ this.connConfigurator = connConfigurator;
+ this.authenticator = authenticator;
+ this.authUgi = authUgi;
+ this.doAsUser = doAsUser;
+ this.token = ReflectionUtils.createClazzInstance(
+ DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME, null, null);
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ try {
+ AuthenticatedURL authenticatedURL= ReflectionUtils.createClazzInstance(
+ DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME, new Class[] {
+ delegationTokenAuthenticatorClazz,
+ ConnectionConfigurator.class
+ }, new Object[] {
+ authenticator,
+ connConfigurator
+ });
+ return ReflectionUtils.invokeMethod(authenticatedURL,
+ delegationTokenAuthenticateURLOpenConnectionMethod, url, token, doAsUser);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Pseudo auth strategy for env where delegation token is not supported (hadoop 2.4)
+ */
+ @VisibleForTesting
+ protected static class TimelineReaderPseudoAuthenticatedStrategy implements TimelineReaderStrategy {
+
+ private final ConnectionConfigurator connectionConf;
+
+ public TimelineReaderPseudoAuthenticatedStrategy(final Configuration conf,
+ final boolean useHttps,
+ final int connTimeout) {
+ connectionConf = getNewConnectionConf(conf, useHttps, connTimeout);
+ }
+
+ @Override
+ public Client getHttpClient() {
+ ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(connectionConf);
+ Client httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+ return httpClient;
+ }
+
+ @VisibleForTesting
+ protected static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+ private final ConnectionConfigurator connectionConf;
+
+ public PseudoAuthenticatedURLConnectionFactory(ConnectionConfigurator connectionConf) {
+ this.connectionConf = connectionConf;
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+ URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+
+ HttpURLConnection httpURLConnection =
+ (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+ this.connectionConf.configure(httpURLConnection);
+
+ return httpURLConnection;
+ }
+ }
+ }
+
+ private static ConnectionConfigurator getNewConnectionConf(final Configuration conf,
+ final boolean useHttps,
+ final int connTimeout) {
+ ConnectionConfigurator connectionConf = null;
+ if (useHttps) {
+ try {
+ connectionConf = getNewSSLConnectionConf(conf, connTimeout);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot load customized ssl related configuration."
+ + " Falling back to system-generic settings.", e);
+ }
+ }
+ }
+
+ if (connectionConf == null) {
+ connectionConf = new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection httpURLConnection) throws IOException {
+ setTimeouts(httpURLConnection, connTimeout);
+ return httpURLConnection;
+ }
+ };
+ }
+
+ return connectionConf;
+ }
+
+ private static ConnectionConfigurator getNewSSLConnectionConf(final Configuration conf,
+ final int connTimeout)
+ throws IOException {
+ final SSLFactory sslFactory;
+ final SSLSocketFactory sslSocketFactory;
+ final HostnameVerifier hostnameVerifier;
+
+ sslFactory = new SSLFactory(CLIENT, conf);
+ try {
+ sslFactory.init();
+ sslSocketFactory = sslFactory.createSSLSocketFactory();
+ } catch (GeneralSecurityException e) {
+ sslFactory.destroy();
+ throw new IOException("Failed to initialize ssl factory");
+ }
+ hostnameVerifier = sslFactory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection httpURLConnection) throws IOException {
+ if (!(httpURLConnection instanceof HttpsURLConnection)) {
+ throw new IOException("Expected https connection");
+ }
+ HttpsURLConnection httpsURLConnection = (HttpsURLConnection) httpURLConnection;
+ httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
+ httpsURLConnection.setHostnameVerifier(hostnameVerifier);
+ setTimeouts(httpsURLConnection, connTimeout);
+
+ return httpsURLConnection;
+ }
+ };
+ }
+
+ private static void setTimeouts(HttpURLConnection httpURLConnection, int connTimeout) {
+ httpURLConnection.setConnectTimeout(connTimeout);
+ httpURLConnection.setReadTimeout(connTimeout);
+ }
+
+ private static boolean isTokenDelegationSupportChecksDone = false;
+ private static boolean isTokenDelegationClassesPresent = false;
+
+ // Check if all the classes required for doing token authentication are present. These classes
+ // are present only from hadoop 2.6 onwards.
+ private static synchronized boolean tokenDelegationSupported() {
+
+ if (!isTokenDelegationSupportChecksDone) {
+
+ isTokenDelegationSupportChecksDone = true;
+
+ try {
+ ReflectionUtils.getClazz(KERBEROS_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+ ReflectionUtils.getClazz(PSEUDO_DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+
+ delegationTokenAuthenticatorClazz =
+ ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATOR_CLAZZ_NAME);
+
+ Class<?> delegationTokenAuthenticatedURLClazz =
+ ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATED_URL_CLAZZ_NAME);
+
+ Class<?> delegationTokenAuthenticatedURLTokenClazz =
+ ReflectionUtils.getClazz(DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME);
+
+ delegationTokenAuthenticateURLOpenConnectionMethod =
+ ReflectionUtils.getMethod(delegationTokenAuthenticatedURLClazz, "openConnection",
+ URL.class, delegationTokenAuthenticatedURLTokenClazz, String.class);
+
+ isTokenDelegationClassesPresent = true;
+
+ } catch (TezUncheckedException e) {
+ LOG.info("Could not find class required for token delegation, will fallback to pseudo auth");
+ }
+ }
+
+ return isTokenDelegationClassesPresent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index a72b799..ef1b0a5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -56,7 +56,7 @@ public class TestATSHttpClient {
public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
ApplicationId mockAppId = mock(ApplicationId.class);
DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
- new TezConfiguration(), null);
+ new TezConfiguration(), null, 0);
DAGClientTimelineImpl spyClient = spy(httpClient);
spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
@@ -80,7 +80,7 @@ public class TestATSHttpClient {
public void testGetDagStatusSimple() throws TezException, JSONException, IOException {
DAGClientTimelineImpl
httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class),"EXAMPLE_DAG_ID",
- new TezConfiguration(), null);
+ new TezConfiguration(), null, 0);
DAGClientTimelineImpl spyClient = spy(httpClient);
spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
@@ -140,7 +140,7 @@ public class TestATSHttpClient {
public void testGetVertexStatusSimple() throws JSONException, TezException, IOException {
DAGClientTimelineImpl
httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class), "EXAMPLE_DAG_ID",
- new TezConfiguration(), null);
+ new TezConfiguration(), null, 0);
DAGClientTimelineImpl spyClient = spy(httpClient);
spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID" +
http://git-wip-us.apache.org/repos/asf/tez/blob/8710df0d/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
new file mode 100644
index 0000000..4aff0ca
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestTimelineReaderFactory.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static org.mockito.Mockito.mock;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderPseudoAuthenticatedStrategy;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimelineReaderFactory {
+
+ @Before
+ public void setup() {
+ // Disable tests if hadoop version is less than 2.4.0
+ // as Timeline is not supported in 2.2.x or 2.3.x
+ String hadoopVersion = System.getProperty("tez.hadoop.version");
+ Assume.assumeFalse(hadoopVersion.startsWith("2.2.") || hadoopVersion.startsWith("2.3."));
+ }
+
+ // ensure on hadoop 2.4 TimelinePseudoAuthenticatedStrategy is used.
+ @Test(timeout = 5000)
+ public void testShouldUsePseudoAuthStrategyForHadoop24() throws TezException {
+ String hadoopVersion = System.getProperty("tez.hadoop.version");
+ Assume.assumeTrue(hadoopVersion.startsWith("2.4.") || hadoopVersion.startsWith("2.5."));
+
+ String returnedClassName =
+ TimelineReaderFactory.getTimelineReaderStrategy(mock(Configuration.class), false, 0)
+ .getClass()
+ .getCanonicalName();
+ Assert.assertEquals("should use pseudo auth on hadoop2.4",
+ "org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderPseudoAuthenticatedStrategy",
+ returnedClassName);
+ }
+
+ // ensure on hadoop 2.6+ TimelineReaderTokenAuthenticatedStrategy is used.
+ @Test(timeout = 5000)
+ public void testShouldUseTokenDelegationAuthStrategyForHadoop26() throws TezException {
+ String hadoopVersion = System.getProperty("tez.hadoop.version");
+ Assume.assumeFalse(hadoopVersion.startsWith("2.2.") ||
+ hadoopVersion.startsWith("2.3.") ||
+ hadoopVersion.startsWith("2.4.") ||
+ hadoopVersion.startsWith("2.5."));
+
+ String returnedClassName =
+ TimelineReaderFactory.getTimelineReaderStrategy(mock(Configuration.class), false, 0)
+ .getClass()
+ .getCanonicalName();
+ Assert.assertEquals("should use pseudo auth on hadoop2.4",
+ "org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderTokenAuthenticatedStrategy",
+ returnedClassName);
+ }
+
+ @Test(timeout = 5000)
+ public void testPseudoAuthenticatorConnectionUrlShouldHaveUserName() throws Exception {
+ ConnectionConfigurator connConf = mock(ConnectionConfigurator.class);
+ TimelineReaderPseudoAuthenticatedStrategy.PseudoAuthenticatedURLConnectionFactory
+ connectionFactory = new TimelineReaderPseudoAuthenticatedStrategy
+ .PseudoAuthenticatedURLConnectionFactory(connConf);
+ String inputUrl = "http://host:8080/path";
+ String expectedUrl = inputUrl + "?user.name=" + UserGroupInformation.getCurrentUser().getShortUserName();
+ HttpURLConnection httpURLConnection = connectionFactory.getHttpURLConnection(new URL(inputUrl));
+ Assert.assertEquals(expectedUrl, httpURLConnection.getURL().toString());
+ }
+
+}