You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/09 07:37:08 UTC

git commit: TEZ-607. Client DAG AM communication broken for token based security (bikas)

Updated Branches:
  refs/heads/master ae541f78d -> 8a259dfc3


TEZ-607. Client DAG AM communication broken for token based security (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8a259dfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8a259dfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8a259dfc

Branch: refs/heads/master
Commit: 8a259dfc3dc8de2222948f769e6c9b3d02bec688
Parents: ae541f7
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Nov 8 22:33:48 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Nov 8 22:33:48 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/client/DAGClientServer.java | 17 +++++++++++++++--
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |  2 +-
 .../org/apache/tez/dag/app/rm/TaskScheduler.java   |  7 +++++--
 .../app/rm/TaskSchedulerAppCallbackWrapper.java    | 12 ++++++++----
 .../tez/dag/app/rm/TaskSchedulerEventHandler.java  |  5 ++++-
 .../apache/tez/dag/app/rm/TestTaskScheduler.java   |  5 ++++-
 .../tez/dag/app/rm/TestTaskSchedulerHelpers.java   |  5 +++--
 7 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 2b5fb6d..91fd8b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,8 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -41,13 +44,16 @@ import com.google.protobuf.BlockingService;
 public class DAGClientServer extends AbstractService {
   static final Log LOG = LogFactory.getLog(DAGClientServer.class);
 
+  ClientToAMTokenSecretManager secretManager;
   DAGClientHandler realInstance;
   Server server;
   InetSocketAddress bindAddress;
 
-  public DAGClientServer(DAGClientHandler realInstance) {
+  public DAGClientServer(DAGClientHandler realInstance,
+      ApplicationAttemptId attemptId) {
     super("DAGClientRPCServer");
     this.realInstance = realInstance;
+    this.secretManager = new ClientToAMTokenSecretManager(attemptId, null);
   }
 
   @Override
@@ -88,6 +94,13 @@ public class DAGClientServer extends AbstractService {
   public InetSocketAddress getBindAddress() {
     return bindAddress;
   }
+  
+  public void setClientAMSecretKey(ByteBuffer key) {
+    if (key != null && key.hasRemaining()) {
+      // non-empty key. must be useful
+      secretManager.setMasterKey(key.array());
+    }
+  }
 
   private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
       int numHandlers,
@@ -96,7 +109,7 @@ public class DAGClientServer extends AbstractService {
     RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
         .setInstance(blockingService).setBindAddress(addr.getHostName())
         .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
-        .setPortRangeConfig(portRangeConfig)
+        .setPortRangeConfig(portRangeConfig).setSecretManager(secretManager)
         .build();
     server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
     return server;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 10d05ff..eb6c16c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -256,7 +256,7 @@ public class DAGAppMaster extends AbstractService {
     dispatcher = createDispatcher();
     addIfService(dispatcher, false);
 
-    clientRpcServer = new DAGClientServer(clientHandler);
+    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID);
     addIfService(clientRpcServer, true);
 
     taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index ccc4a94..b2d3ffb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -104,7 +105,8 @@ public class TaskScheduler extends AbstractService
     public void appShutdownRequested();
     public void setApplicationRegistrationData(
                                 Resource maxContainerCapability,
-                                Map<ApplicationAccessType, String> appAcls
+                                Map<ApplicationAccessType, String> appAcls,
+                                ByteBuffer clientAMSecretKey
                                 );
     public void onError(Throwable t);
     public float getProgress();
@@ -335,7 +337,8 @@ public class TaskScheduler extends AbstractService
       // upcall to app outside locks
       appClientDelegate.setApplicationRegistrationData(
           response.getMaximumResourceCapability(),
-          response.getApplicationACLs());
+          response.getApplicationACLs(),
+          response.getClientToAMTokenMasterKey());
 
       delayedContainerManager.start();
     } catch (YarnException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 059a190..1f11a03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -98,9 +99,9 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
 
   @Override
   public void setApplicationRegistrationData(Resource maxContainerCapability,
-      Map<ApplicationAccessType, String> appAcls) {
+      Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
     completionService.submit(new SetApplicationRegistrationDataCallable(real,
-        maxContainerCapability, appAcls));
+        maxContainerCapability, appAcls, key));
   }
 
   @Override
@@ -234,18 +235,21 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
 
     private final Resource maxContainerCapability;
     private final Map<ApplicationAccessType, String> appAcls;
+    private final ByteBuffer key;
 
     public SetApplicationRegistrationDataCallable(TaskSchedulerAppCallback app,
         Resource maxContainerCapability,
-        Map<ApplicationAccessType, String> appAcls) {
+        Map<ApplicationAccessType, String> appAcls,
+        ByteBuffer key) {
       super(app);
       this.maxContainerCapability = maxContainerCapability;
       this.appAcls = appAcls;
+      this.key = key;
     }
 
     @Override
     public Void call() throws Exception {
-      app.setApplicationRegistrationData(maxContainerCapability, appAcls);
+      app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 18592fa..3d80d0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.rm;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -444,10 +445,12 @@ public class TaskSchedulerEventHandler extends AbstractService
   @Override
   public synchronized void setApplicationRegistrationData(
       Resource maxContainerCapability,
-      Map<ApplicationAccessType, String> appAcls) {
+      Map<ApplicationAccessType, String> appAcls, 
+      ByteBuffer clientAMSecretKey) {
     this.appContext.getClusterInfo().setMaxContainerCapability(
         maxContainerCapability);
     this.appAcls = appAcls;
+    this.clientService.setClientAMSecretKey(clientAMSecretKey);
   }
 
   // Not synchronized to avoid deadlocks from TaskScheduler callbacks.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 1ae7d70..0e4285c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -121,6 +122,8 @@ public class TestTaskScheduler {
     when(mockRegResponse.getMaximumResourceCapability()).
                                                    thenReturn(mockMaxResource);
     when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
+    ByteBuffer mockKey = mock(ByteBuffer.class);
+    when(mockRegResponse.getClientToAMTokenMasterKey()).thenReturn(mockKey);
     when(mockRMClient.
           registerApplicationMaster(anyString(), anyInt(), anyString())).
                                                    thenReturn(mockRegResponse);
@@ -129,7 +132,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).start();
     verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
     verify(mockApp).setApplicationRegistrationData(mockMaxResource,
-                                                   mockAcls);
+                                                   mockAcls, mockKey);
 
     when(mockRMClient.getClusterNodeCount()).thenReturn(5);
     Assert.assertEquals(5, scheduler.getClusterNodeCount());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a259dfc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index bd81618..9557456 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -262,9 +263,9 @@ class TestTaskSchedulerHelpers {
 
     @Override
     public void setApplicationRegistrationData(Resource maxContainerCapability,
-        Map<ApplicationAccessType, String> appAcls) {
+        Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
       invocations++;
-      real.setApplicationRegistrationData(maxContainerCapability, appAcls);
+      real.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
     }
 
     @Override