You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/09 07:37:08 UTC
git commit: TEZ-607. Client DAG AM communication broken for token
based security (bikas)
Updated Branches:
refs/heads/master ae541f78d -> 8a259dfc3
TEZ-607. Client DAG AM communication broken for token based security (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8a259dfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8a259dfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8a259dfc
Branch: refs/heads/master
Commit: 8a259dfc3dc8de2222948f769e6c9b3d02bec688
Parents: ae541f7
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 8 22:33:48 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 8 22:33:48 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/api/client/DAGClientServer.java | 17 +++++++++++++++--
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../org/apache/tez/dag/app/rm/TaskScheduler.java | 7 +++++--
.../app/rm/TaskSchedulerAppCallbackWrapper.java | 12 ++++++++----
.../tez/dag/app/rm/TaskSchedulerEventHandler.java | 5 ++++-
.../apache/tez/dag/app/rm/TestTaskScheduler.java | 5 ++++-
.../tez/dag/app/rm/TestTaskSchedulerHelpers.java | 5 +++--
7 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 2b5fb6d..91fd8b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,8 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -41,13 +44,16 @@ import com.google.protobuf.BlockingService;
public class DAGClientServer extends AbstractService {
static final Log LOG = LogFactory.getLog(DAGClientServer.class);
+ ClientToAMTokenSecretManager secretManager;
DAGClientHandler realInstance;
Server server;
InetSocketAddress bindAddress;
- public DAGClientServer(DAGClientHandler realInstance) {
+ public DAGClientServer(DAGClientHandler realInstance,
+ ApplicationAttemptId attemptId) {
super("DAGClientRPCServer");
this.realInstance = realInstance;
+ this.secretManager = new ClientToAMTokenSecretManager(attemptId, null);
}
@Override
@@ -88,6 +94,13 @@ public class DAGClientServer extends AbstractService {
public InetSocketAddress getBindAddress() {
return bindAddress;
}
+
+ public void setClientAMSecretKey(ByteBuffer key) {
+ if (key != null && key.hasRemaining()) {
+ // non-empty key. must be useful
+ secretManager.setMasterKey(key.array());
+ }
+ }
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
int numHandlers,
@@ -96,7 +109,7 @@ public class DAGClientServer extends AbstractService {
RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
.setInstance(blockingService).setBindAddress(addr.getHostName())
.setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
- .setPortRangeConfig(portRangeConfig)
+ .setPortRangeConfig(portRangeConfig).setSecretManager(secretManager)
.build();
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 10d05ff..eb6c16c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -256,7 +256,7 @@ public class DAGAppMaster extends AbstractService {
dispatcher = createDispatcher();
addIfService(dispatcher, false);
- clientRpcServer = new DAGClientServer(clientHandler);
+ clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
addIfService(clientRpcServer, true);
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index ccc4a94..b2d3ffb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.rm;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -104,7 +105,8 @@ public class TaskScheduler extends AbstractService
public void appShutdownRequested();
public void setApplicationRegistrationData(
Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey
);
public void onError(Throwable t);
public float getProgress();
@@ -335,7 +337,8 @@ public class TaskScheduler extends AbstractService
// upcall to app outside locks
appClientDelegate.setApplicationRegistrationData(
response.getMaximumResourceCapability(),
- response.getApplicationACLs());
+ response.getApplicationACLs(),
+ response.getClientToAMTokenMasterKey());
delayedContainerManager.start();
} catch (YarnException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 059a190..1f11a03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.rm;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -98,9 +99,9 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
@Override
public void setApplicationRegistrationData(Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls) {
+ Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
completionService.submit(new SetApplicationRegistrationDataCallable(real,
- maxContainerCapability, appAcls));
+ maxContainerCapability, appAcls, key));
}
@Override
@@ -234,18 +235,21 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
private final Resource maxContainerCapability;
private final Map<ApplicationAccessType, String> appAcls;
+ private final ByteBuffer key;
public SetApplicationRegistrationDataCallable(TaskSchedulerAppCallback app,
Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls) {
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer key) {
super(app);
this.maxContainerCapability = maxContainerCapability;
this.appAcls = appAcls;
+ this.key = key;
}
@Override
public Void call() throws Exception {
- app.setApplicationRegistrationData(maxContainerCapability, appAcls);
+ app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 18592fa..3d80d0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.rm;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -444,10 +445,12 @@ public class TaskSchedulerEventHandler extends AbstractService
@Override
public synchronized void setApplicationRegistrationData(
Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls) {
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey) {
this.appContext.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.appAcls = appAcls;
+ this.clientService.setClientAMSecretKey(clientAMSecretKey);
}
// Not synchronized to avoid deadlocks from TaskScheduler callbacks.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 1ae7d70..0e4285c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -121,6 +122,8 @@ public class TestTaskScheduler {
when(mockRegResponse.getMaximumResourceCapability()).
thenReturn(mockMaxResource);
when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+ ByteBuffer mockKey = mock(ByteBuffer.class);
+ when(mockRegResponse.getClientToAMTokenMasterKey()).thenReturn(mockKey);
when(mockRMClient.
registerApplicationMaster(anyString(), anyInt(), anyString())).
thenReturn(mockRegResponse);
@@ -129,7 +132,7 @@ public class TestTaskScheduler {
verify(mockRMClient).start();
verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
verify(mockApp).setApplicationRegistrationData(mockMaxResource,
- mockAcls);
+ mockAcls, mockKey);
when(mockRMClient.getClusterNodeCount()).thenReturn(5);
Assert.assertEquals(5, scheduler.getClusterNodeCount());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index bd81618..9557456 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -262,9 +263,9 @@ class TestTaskSchedulerHelpers {
@Override
public void setApplicationRegistrationData(Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls) {
+ Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
invocations++;
- real.setApplicationRegistrationData(maxContainerCapability, appAcls);
+ real.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
}
@Override