You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/05/11 20:45:22 UTC

tez git commit: TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master e00b44a80 -> f0b9d7ec7


TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0b9d7ec
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0b9d7ec
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0b9d7ec

Branch: refs/heads/master
Commit: f0b9d7ec7472d6c4b932c249f6106c5b6ca85b88
Parents: e00b44a
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 11 11:44:53 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 11 11:44:53 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../tez/dag/api/client/DAGClientImpl.java       |   5 -
 .../dag/api/client/DAGClientTimelineImpl.java   | 187 +++++++++++++++++--
 .../tez/dag/api/client/TestATSHttpClient.java   |   2 +-
 4 files changed, 171 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4c2baf..265281f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2435. Add public key to KEYS
   TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
   TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
@@ -190,6 +191,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version.
   TEZ-2237. Valid events should be sent out when an Output is not started.
   TEZ-1988. Tez UI: does not work when using file:// in a browser
@@ -366,6 +368,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
     Invalid event: T_ATTEMPT_KILLED at KILLED.

http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index de6ede6..b0ad51c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -93,11 +93,6 @@ public class DAGClientImpl extends DAGClient {
             conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
                  TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
 
-    if (UserGroupInformation.isSecurityEnabled()){
-      //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
-      isATSEnabled = false;
-    }
-
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
     statusPollInterval = conf.getLong(
         TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index cc000df..d0b11d6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -19,13 +19,20 @@
 package org.apache.tez.dag.api.client;
 
 import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
-import java.net.URLEncoder;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,6 +52,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -81,8 +96,13 @@ public class DAGClientTimelineImpl extends DAGClient {
   private static Client httpClient = null;
   private final ApplicationId appId;
   private final String dagId;
-  private final TezConfiguration conf;
   private final FrameworkClient frameworkClient;
+  private final UserGroupInformation authUgi;
+  private final String doAsUser;
+  private final DelegationTokenAuthenticator authenticator;
+  private final DelegationTokenAuthenticatedURL.Token token;
+  private final ConnectionConfigurator connConfigurator;
+  private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds
 
   private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
 
@@ -91,10 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient {
 
   public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
                                FrameworkClient frameworkClient)
-      throws TezException {
+      throws TezException, IOException {
     this.appId = appId;
     this.dagId = dagId;
-    this.conf = conf;
     this.frameworkClient = frameworkClient;
 
     String scheme;
@@ -111,8 +130,28 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
 
     baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
-  }
 
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUgi = ugi.getRealUser();
+    if (realUgi != null) {
+      authUgi = realUgi;
+      doAsUser = ugi.getShortUserName();
+    } else {
+      authUgi = ugi;
+      doAsUser = null;
+    }
+
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      authenticator = new KerberosDelegationTokenAuthenticator();
+    } else {
+      authenticator = new PseudoDelegationTokenAuthenticator();
+    }
+
+    connConfigurator = newConnConfigurator(conf);
+    authenticator.setConnectionConfigurator(connConfigurator);
+    token = new DelegationTokenAuthenticatedURL.Token();
+  }
 
   @Override
   public String getExecutionContext() {
@@ -125,7 +164,13 @@ public class DAGClientTimelineImpl extends DAGClient {
     try {
       appReport = frameworkClient.getApplicationReport(appId);
     } catch (YarnException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("error getting application report", e);
+      }
     } catch (IOException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("error getting application report", e);
+      }
     }
     return appReport;
   }
@@ -412,8 +457,13 @@ public class DAGClientTimelineImpl extends DAGClient {
           .type(MediaType.APPLICATION_JSON_TYPE)
           .get(ClientResponse.class);
 
-      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
-        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
+      if (clientResponseStatus != ClientResponse.Status.OK) {
+        if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) {
+          httpClient = null;
+        }
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url +
+          " error: " + clientResponseStatus);
       }
 
       return response.getEntity(JSONObject.class);
@@ -423,6 +473,8 @@ public class DAGClientTimelineImpl extends DAGClient {
       throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
     } catch (IllegalArgumentException e) {
       throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
+    } catch (IOException e) {
+      throw new TezException("Error getting http client connection", e);
     }
   }
 
@@ -460,15 +512,119 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
   }
 
-  protected Client getHttpClient() {
+  protected Client getHttpClient() throws IOException, TezException {
     if (httpClient == null) {
-      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
-      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
-      httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+      if (UserGroupInformation.isSecurityEnabled()) {
+        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        try {
+          final Token<?> delegationToken = getDelegationToken(currentUser.getUserName());
+          currentUser.addToken(delegationToken);
+        } catch (UndeclaredThrowableException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("exception getting httpclient token", e);
+          }
+        }
+      }
+
+      ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory();
+      httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
     }
     return httpClient;
   }
 
+  private Token<?> getDelegationToken(final String renewer) throws
+      IOException, TezException {
+    authUgi.checkTGTAndReloginFromKeytab();
+    try {
+      return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+        @Override
+        public Token<?> run() throws IOException, AuthenticationException {
+          try {
+            URI resURI = URI.create(baseUri);
+            DelegationTokenAuthenticatedURL authUrl =
+                new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+            return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser);
+          } catch (IllegalArgumentException e) {
+            throw new IOException("invalid url " + baseUri, e);
+          }
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
+  }
+
+  private class TimelineURLConnectionFactory implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+      try {
+        return new DelegationTokenAuthenticatedURL(
+            authenticator, connConfigurator).openConnection(url, token,
+            doAsUser);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (AuthenticationException ae) {
+        throw new IOException(ae);
+      }
+    }
+
+  }
+
+  private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
+    try {
+      return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+    } catch (Exception e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot load customized ssl related configuration. " +
+            "Fallback to system-generic settings.", e);
+      }
+      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+    }
+  }
+
+  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
+      new ConnectionConfigurator() {
+        @Override
+        public HttpURLConnection configure(HttpURLConnection conn)
+            throws IOException {
+          setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+          return conn;
+        }
+      };
+
+  private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
+      throws IOException, GeneralSecurityException {
+    final SSLFactory factory;
+    final SSLSocketFactory sf;
+    final HostnameVerifier hv;
+
+    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+    factory.init();
+    sf = factory.createSSLSocketFactory();
+    hv = factory.getHostnameVerifier();
+
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection conn)
+          throws IOException {
+        if (conn instanceof HttpsURLConnection) {
+          HttpsURLConnection c = (HttpsURLConnection) conn;
+          c.setSSLSocketFactory(sf);
+          c.setHostnameVerifier(hv);
+        }
+        setTimeouts(conn, timeout);
+        return conn;
+      }
+    };
+  }
+
+  private static void setTimeouts(URLConnection connection, int socketTimeout) {
+    connection.setConnectTimeout(socketTimeout);
+    connection.setReadTimeout(socketTimeout);
+  }
+
   private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
       Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
         put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
@@ -498,15 +654,6 @@ public class DAGClientTimelineImpl extends DAGClient {
       }});
 
 
-  static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
-    @Override
-    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
-      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
-          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
-      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
-    }
-  }
-
   @Override
   public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
       long timeout) throws IOException, TezException {

http://git-wip-us.apache.org/repos/asf/tez/blob/f0b9d7ec/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index a72b799..aafc28f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -53,7 +53,7 @@ public class TestATSHttpClient {
   }
 
   @Test(timeout = 5000)
-  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
+  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException {
     ApplicationId mockAppId = mock(ApplicationId.class);
     DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
         new TezConfiguration(), null);