You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/14 02:10:31 UTC

[2/2] tez git commit: Revert "TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)"

Revert "TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)"

This reverts commit f0b9d7ec7472d6c4b932c249f6106c5b6ca85b88.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2467d608
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2467d608
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2467d608

Branch: refs/heads/master
Commit: 2467d60896e5c99536fe4e64b58c6f57dc1587ec
Parents: 2df32bf
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu May 14 08:10:12 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu May 14 08:10:12 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   5 +-
 .../tez/dag/api/client/DAGClientImpl.java       |   5 +
 .../dag/api/client/DAGClientTimelineImpl.java   | 187 ++-----------------
 .../tez/dag/api/client/TestATSHttpClient.java   |   2 +-
 4 files changed, 29 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 92c4df2..c5acc21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,7 +24,6 @@ INCOMPATIBLE CHANGES
 ALL CHANGES:
   TEZ-2445. Disable the object cleanup in local mode in LogicalIOProcessorRuntimeTask.
   TEZ-2057. tez-dag/pom.xml contains versions for dependencies.
-  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2435. Add public key to KEYS
   TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
   TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
@@ -195,9 +194,12 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+<<<<<<< HEAD
   TEZ-2057. tez-dag/pom.xml contains versions for dependencies.
   TEZ-2282. Delimit reused yarn container logs (stderr, stdout, syslog) with task attempt start/stop events
   TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
+=======
+>>>>>>> parent of f0b9d7e... TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh)
   TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version.
   TEZ-2237. Valid events should be sent out when an Output is not started.
   TEZ-1988. Tez UI: does not work when using file:// in a browser
@@ -374,7 +376,6 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
-  TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2369. Add a few unit tests for RootInputInitializerManager.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
     Invalid event: T_ATTEMPT_KILLED at KILLED.

http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index b0ad51c..de6ede6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -93,6 +93,11 @@ public class DAGClientImpl extends DAGClient {
             conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
                  TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
 
+    if (UserGroupInformation.isSecurityEnabled()){
+      //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
+      isATSEnabled = false;
+    }
+
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
     statusPollInterval = conf.getLong(
         TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index d0b11d6..cc000df 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -19,20 +19,13 @@
 package org.apache.tez.dag.api.client;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.net.HttpURLConnection;
-import java.net.URI;
 import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-import java.security.PrivilegedExceptionAction;
+import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,14 +45,6 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -96,13 +81,8 @@ public class DAGClientTimelineImpl extends DAGClient {
   private static Client httpClient = null;
   private final ApplicationId appId;
   private final String dagId;
+  private final TezConfiguration conf;
   private final FrameworkClient frameworkClient;
-  private final UserGroupInformation authUgi;
-  private final String doAsUser;
-  private final DelegationTokenAuthenticator authenticator;
-  private final DelegationTokenAuthenticatedURL.Token token;
-  private final ConnectionConfigurator connConfigurator;
-  private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds
 
   private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
 
@@ -111,9 +91,10 @@ public class DAGClientTimelineImpl extends DAGClient {
 
   public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
                                FrameworkClient frameworkClient)
-      throws TezException, IOException {
+      throws TezException {
     this.appId = appId;
     this.dagId = dagId;
+    this.conf = conf;
     this.frameworkClient = frameworkClient;
 
     String scheme;
@@ -130,29 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
 
     baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
-
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUgi = ugi.getRealUser();
-    if (realUgi != null) {
-      authUgi = realUgi;
-      doAsUser = ugi.getShortUserName();
-    } else {
-      authUgi = ugi;
-      doAsUser = null;
-    }
-
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      authenticator = new KerberosDelegationTokenAuthenticator();
-    } else {
-      authenticator = new PseudoDelegationTokenAuthenticator();
-    }
-
-    connConfigurator = newConnConfigurator(conf);
-    authenticator.setConnectionConfigurator(connConfigurator);
-    token = new DelegationTokenAuthenticatedURL.Token();
   }
 
+
   @Override
   public String getExecutionContext() {
     return "Executing on YARN cluster with App id " + appId;
@@ -164,13 +125,7 @@ public class DAGClientTimelineImpl extends DAGClient {
     try {
       appReport = frameworkClient.getApplicationReport(appId);
     } catch (YarnException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("error getting application report", e);
-      }
     } catch (IOException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("error getting application report", e);
-      }
     }
     return appReport;
   }
@@ -457,13 +412,8 @@ public class DAGClientTimelineImpl extends DAGClient {
           .type(MediaType.APPLICATION_JSON_TYPE)
           .get(ClientResponse.class);
 
-      final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
-      if (clientResponseStatus != ClientResponse.Status.OK) {
-        if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) {
-          httpClient = null;
-        }
-        throw new TezException("Failed to get response from YARN Timeline: url: " + url +
-          " error: " + clientResponseStatus);
+      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
       }
 
       return response.getEntity(JSONObject.class);
@@ -473,8 +423,6 @@ public class DAGClientTimelineImpl extends DAGClient {
       throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
     } catch (IllegalArgumentException e) {
       throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
-    } catch (IOException e) {
-      throw new TezException("Error getting http client connection", e);
     }
   }
 
@@ -512,119 +460,15 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
   }
 
-  protected Client getHttpClient() throws IOException, TezException {
+  protected Client getHttpClient() {
     if (httpClient == null) {
-      if (UserGroupInformation.isSecurityEnabled()) {
-        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-        try {
-          final Token<?> delegationToken = getDelegationToken(currentUser.getUserName());
-          currentUser.addToken(delegationToken);
-        } catch (UndeclaredThrowableException e) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("exception getting httpclient token", e);
-          }
-        }
-      }
-
-      ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
-      HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory();
-      httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig);
+      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+      httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
     }
     return httpClient;
   }
 
-  private Token<?> getDelegationToken(final String renewer) throws
-      IOException, TezException {
-    authUgi.checkTGTAndReloginFromKeytab();
-    try {
-      return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
-        @Override
-        public Token<?> run() throws IOException, AuthenticationException {
-          try {
-            URI resURI = URI.create(baseUri);
-            DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
-            return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser);
-          } catch (IllegalArgumentException e) {
-            throw new IOException("invalid url " + baseUri, e);
-          }
-        }
-      });
-    } catch (InterruptedException e) {
-      throw new TezException(e);
-    }
-  }
-
-  private class TimelineURLConnectionFactory implements HttpURLConnectionFactory {
-
-    @Override
-    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
-      try {
-        return new DelegationTokenAuthenticatedURL(
-            authenticator, connConfigurator).openConnection(url, token,
-            doAsUser);
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      } catch (AuthenticationException ae) {
-        throw new IOException(ae);
-      }
-    }
-
-  }
-
-  private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
-    try {
-      return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
-    } catch (Exception e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Cannot load customized ssl related configuration. " +
-            "Fallback to system-generic settings.", e);
-      }
-      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
-    }
-  }
-
-  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
-      new ConnectionConfigurator() {
-        @Override
-        public HttpURLConnection configure(HttpURLConnection conn)
-            throws IOException {
-          setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
-          return conn;
-        }
-      };
-
-  private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
-      throws IOException, GeneralSecurityException {
-    final SSLFactory factory;
-    final SSLSocketFactory sf;
-    final HostnameVerifier hv;
-
-    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-    factory.init();
-    sf = factory.createSSLSocketFactory();
-    hv = factory.getHostnameVerifier();
-
-    return new ConnectionConfigurator() {
-      @Override
-      public HttpURLConnection configure(HttpURLConnection conn)
-          throws IOException {
-        if (conn instanceof HttpsURLConnection) {
-          HttpsURLConnection c = (HttpsURLConnection) conn;
-          c.setSSLSocketFactory(sf);
-          c.setHostnameVerifier(hv);
-        }
-        setTimeouts(conn, timeout);
-        return conn;
-      }
-    };
-  }
-
-  private static void setTimeouts(URLConnection connection, int socketTimeout) {
-    connection.setConnectTimeout(socketTimeout);
-    connection.setReadTimeout(socketTimeout);
-  }
-
   private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
       Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
         put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
@@ -654,6 +498,15 @@ public class DAGClientTimelineImpl extends DAGClient {
       }});
 
 
+  static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+    @Override
+    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+    }
+  }
+
   @Override
   public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
       long timeout) throws IOException, TezException {

http://git-wip-us.apache.org/repos/asf/tez/blob/2467d608/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
index aafc28f..a72b799 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -53,7 +53,7 @@ public class TestATSHttpClient {
   }
 
   @Test(timeout = 5000)
-  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException {
+  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
     ApplicationId mockAppId = mock(ApplicationId.class);
     DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
         new TezConfiguration(), null);