You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/21 00:57:38 UTC
git commit: TEZ-633. Fix client security for Tez session APIs. (sseth)
Updated Branches:
refs/heads/master c171e963f -> 7a3071d8d
TEZ-633. Fix client security for Tez session APIs. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7a3071d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7a3071d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7a3071d8
Branch: refs/heads/master
Commit: 7a3071d8d7f5ac6ff0f8f4a42d39961cbe3a692d
Parents: c171e96
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Nov 20 15:57:21 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Nov 20 15:57:21 2013 -0800
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 42 +++++++++++++++-----
.../dag/api/client/rpc/DAGClientRPCImpl.java | 38 ++----------------
2 files changed, 35 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7a3071d8/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 6f5d098..df15b53 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -35,6 +36,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +49,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -601,19 +605,35 @@ public class TezClientUtils {
} catch (YarnException e) {
throw new TezException(e);
}
- return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort());
+ return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(), appReport.getClientToAMToken());
}
- static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
- String amHost, int amRpcPort) throws IOException {
- InetSocketAddress addr = new InetSocketAddress(amHost,
- amRpcPort);
-
- RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
- ProtobufRpcEngine.class);
- DAGClientAMProtocolBlockingPB proxy =
- (DAGClientAMProtocolBlockingPB) RPC.getProxy(
- DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+ @Private
+ public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration conf, String amHost,
+ int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken) throws IOException {
+
+ final InetSocketAddress serviceAddr = new InetSocketAddress(amHost, amRpcPort);
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(UserGroupInformation
+ .getCurrentUser().getUserName());
+ if (clientToAMToken != null) {
+ Token<ClientToAMTokenIdentifier> token = ConverterUtils.convertFromYarn(clientToAMToken,
+ serviceAddr);
+ userUgi.addToken(token);
+ }
+ LOG.debug("Connecting to " + serviceAddr);
+ DAGClientAMProtocolBlockingPB proxy = null;
+ try {
+ proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
+ @Override
+ public DAGClientAMProtocolBlockingPB run() throws IOException {
+ RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class, ProtobufRpcEngine.class);
+ return (DAGClientAMProtocolBlockingPB) RPC.getProxy(DAGClientAMProtocolBlockingPB.class,
+ 0, serviceAddr, conf);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to connect to AM", e);
+ }
return proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7a3071d8/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 07074a2..8afdf05 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -19,18 +19,12 @@
package org.apache.tez.dag.api.client.rpc;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -38,8 +32,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClientUtils;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -306,32 +299,9 @@ public class DAGClientRPCImpl implements DAGClient {
// attempt not running
return false;
}
-
- UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
- UserGroupInformation.getCurrentUser().getUserName());
- final InetSocketAddress serviceAddr = NetUtils.createSocketAddrForHost(
- appReport.getHost(), appReport.getRpcPort());
- org.apache.hadoop.yarn.api.records.Token clientToAMToken =
- appReport.getClientToAMToken();
- if (clientToAMToken != null) {
- Token<ClientToAMTokenIdentifier> token =
- ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr);
- newUgi.addToken(token);
- }
- LOG.debug("Connecting to " + serviceAddr);
- try {
- proxy = newUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>() {
- @Override
- public DAGClientAMProtocolBlockingPB run() throws IOException {
- RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
- ProtobufRpcEngine.class);
- return (DAGClientAMProtocolBlockingPB) RPC.getProxy(
- DAGClientAMProtocolBlockingPB.class, 0, serviceAddr, conf);
- }
- });
- } catch (InterruptedException e) {
- throw new TezException(e);
- }
+
+ proxy = TezClientUtils.getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(),
+ appReport.getClientToAMToken());
return true;
}