You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/21 00:57:38 UTC

git commit: TEZ-633. Fix client security for Tez session APIs. (sseth)

Updated Branches:
  refs/heads/master c171e963f -> 7a3071d8d


TEZ-633. Fix client security for Tez session APIs. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7a3071d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7a3071d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7a3071d8

Branch: refs/heads/master
Commit: 7a3071d8d7f5ac6ff0f8f4a42d39961cbe3a692d
Parents: c171e96
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Nov 20 15:57:21 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Nov 20 15:57:21 2013 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   | 42 +++++++++++++++-----
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 38 ++----------------
 2 files changed, 35 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7a3071d8/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 6f5d098..df15b53 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -35,6 +36,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +49,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -601,19 +605,35 @@ public class TezClientUtils {
     } catch (YarnException e) {
       throw new TezException(e);
     }
-    return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort());
+    return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(), appReport.getClientToAMToken());
   }
 
-  static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
-      String amHost, int amRpcPort) throws IOException {
-    InetSocketAddress addr = new InetSocketAddress(amHost,
-        amRpcPort);
-
-    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
-        ProtobufRpcEngine.class);
-    DAGClientAMProtocolBlockingPB proxy =
-        (DAGClientAMProtocolBlockingPB) RPC.getProxy(
-            DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+  @Private
+  public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, String amHost,
+      int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken) throws IOException {
+
+    final InetSocketAddress serviceAddr = new InetSocketAddress(amHost, amRpcPort);
+    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(UserGroupInformation
+        .getCurrentUser().getUserName());
+    if (clientToAMToken != null) {
+      Token<ClientToAMTokenIdentifier> token = ConverterUtils.convertFromYarn(clientToAMToken,
+          serviceAddr);
+      userUgi.addToken(token);
+    }
+    LOG.debug("Connecting to " + serviceAddr);
+    DAGClientAMProtocolBlockingPB proxy = null;
+    try {
+      proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
+        @Override
+        public DAGClientAMProtocolBlockingPB run() throws IOException {
+          RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, ProtobufRpcEngine.class);
+          return (DAGClientAMProtocolBlockingPB) RPC.getProxy(DAGClientAMProtocolBlockingPB.class,
+              0, serviceAddr, conf);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException("Failed to connect to AM", e);
+    }
     return proxy;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7a3071d8/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 07074a2..8afdf05 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -19,18 +19,12 @@
 package org.apache.tez.dag.api.client.rpc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -38,8 +32,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -306,32 +299,9 @@ public class DAGClientRPCImpl implements DAGClient {
       // attempt not running
       return false;
     }
-    
-    UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
-        UserGroupInformation.getCurrentUser().getUserName());
-    final InetSocketAddress serviceAddr = NetUtils.createSocketAddrForHost(
-        appReport.getHost(), appReport.getRpcPort());
-    org.apache.hadoop.yarn.api.records.Token clientToAMToken =
-        appReport.getClientToAMToken();
-    if (clientToAMToken != null) {
-      Token<ClientToAMTokenIdentifier> token =
-          ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr);
-      newUgi.addToken(token);
-    }
-    LOG.debug("Connecting to " + serviceAddr);
-    try {
-      proxy = newUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
-        @Override
-        public DAGClientAMProtocolBlockingPB run() throws IOException {
-          RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
-              ProtobufRpcEngine.class);
-          return (DAGClientAMProtocolBlockingPB) RPC.getProxy(
-              DAGClientAMProtocolBlockingPB.class, 0, serviceAddr, conf);
-        }
-      });
-    } catch (InterruptedException e) {
-      throw new TezException(e);
-    }
+
+    proxy = TezClientUtils.getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(),
+        appReport.getClientToAMToken());
     return true;
   }